This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f89d649  [Broker] PIP:84 Redeliver command add epoch. (#10478)
f89d649 is described below

commit f89d6490371177cbb8b3f3b08f1bf2203e420dc3
Author: congbo <[email protected]>
AuthorDate: Mon Feb 14 22:20:12 2022 +0800

    [Broker] PIP:84 Redeliver command add epoch. (#10478)
    
    ## Motivation
    detail in 
https://github.com/apache/pulsar/wiki/PIP-84-%3A-Pulsar-client%3A-Redeliver-command-add-epoch.
    ### Verifying this change
    Add the tests for it
    
    Does this pull request potentially affect one of the following parts:
    If yes was chosen, please highlight the changes
---
 .../org/apache/pulsar/broker/service/Consumer.java |  26 +-
 .../apache/pulsar/broker/service/Dispatcher.java   |   2 +-
 .../pulsar/broker/service/PulsarCommandSender.java |   2 +-
 .../broker/service/PulsarCommandSenderImpl.java    |   4 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  32 ++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +-
 .../pulsar/broker/service/SubscriptionOption.java  |   1 +
 .../nonpersistent/NonPersistentDispatcher.java     |   2 +-
 .../nonpersistent/NonPersistentSubscription.java   |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +-
 .../PersistentDispatcherMultipleConsumers.java     |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  72 ++++-
 ...entStreamingDispatcherSingleActiveConsumer.java |   3 +-
 .../service/persistent/PersistentSubscription.java |   4 +-
 .../broker/service/persistent/PersistentTopic.java |  26 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |   4 +-
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  19 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |  19 +-
 .../broker/service/PersistentTopicE2ETest.java     |   3 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  49 ++--
 .../client/impl/CompactedOutBatchMessageTest.java  |   5 +-
 .../pulsar/client/impl/MessageRedeliveryTest.java  | 133 +++++++++-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  11 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  28 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 294 +++++++++++++--------
 .../org/apache/pulsar/client/impl/MessageImpl.java |  45 ++--
 .../client/impl/MessagePayloadContextImpl.java     |  14 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  30 ++-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   2 +-
 .../apache/pulsar/common/protocol/Commands.java    |  27 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 32 files changed, 632 insertions(+), 246 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0d26250..9e53438 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import io.netty.util.concurrent.Future;
@@ -33,6 +34,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -131,11 +134,15 @@ public class Consumer {
     private final MessageId startMessageId;
     private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch;
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId, 
long consumerEpoch) {
 
         this.subscription = subscription;
         this.subType = subType;
@@ -186,6 +193,7 @@ public class Consumer {
         }
 
         this.clientAddress = cnx.clientSourceAddress();
+        this.consumerEpoch = consumerEpoch;
         this.isAcknowledgmentAtBatchIndexLevelEnabled = 
subscription.getTopic().getBrokerService()
                 
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
     }
@@ -214,6 +222,14 @@ public class Consumer {
         return readCompacted;
     }
 
+    public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,
+                                     EntryBatchIndexesAcks batchIndexesAcks,
+                                     int totalMessages, long totalBytes, long 
totalChunkedMessages,
+                                     RedeliveryTracker redeliveryTracker) {
+        return sendMessages(entries, batchSizes, batchIndexesAcks, 
totalMessages, totalBytes,
+                totalChunkedMessages, redeliveryTracker, 
DEFAULT_CONSUMER_EPOCH);
+    }
+
     /**
      * Dispatch a list of entries to the consumer. <br/>
      * <b>It is also responsible to release entries data and recycle entries 
object.</b>
@@ -223,7 +239,7 @@ public class Consumer {
     public Future<Void> sendMessages(final List<Entry> entries, 
EntryBatchSizes batchSizes,
                                      EntryBatchIndexesAcks batchIndexesAcks,
                                      int totalMessages, long totalBytes, long 
totalChunkedMessages,
-                                     RedeliveryTracker redeliveryTracker) {
+                                     RedeliveryTracker redeliveryTracker, long 
epoch) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
 
         if (entries.isEmpty() || totalMessages == 0) {
@@ -286,7 +302,7 @@ public class Consumer {
 
 
         return cnx.getCommandSender().sendMessagesToConsumer(consumerId, 
topicName, subscription, partitionIdx,
-                entries, batchSizes, batchIndexesAcks, redeliveryTracker);
+                entries, batchSizes, batchIndexesAcks, redeliveryTracker, 
epoch);
     }
 
     private void incrementUnackedMessages(int ackedMessages) {
@@ -850,7 +866,7 @@ public class Consumer {
         return priorityLevel;
     }
 
-    public void redeliverUnacknowledgedMessages() {
+    public void redeliverUnacknowledgedMessages(long consumerEpoch) {
         // cleanup unackedMessage bucket and redeliver those unack-msgs again
         clearUnAckedMsgs();
         blockedConsumerOnUnackedMsgs = false;
@@ -875,7 +891,7 @@ public class Consumer {
             
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), 
totalRedeliveryMessages.intValue());
             subscription.redeliverUnacknowledgedMessages(this, 
pendingPositions);
         } else {
-            subscription.redeliverUnacknowledgedMessages(this);
+            subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
         }
 
         flowConsumerBlockedPermits(this);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 7dab8b6..918f0d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -80,7 +80,7 @@ public interface Dispatcher {
 
     SubType getType();
 
-    void redeliverUnacknowledgedMessages(Consumer consumer);
+    void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch);
 
     void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> 
positions);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 6fcbb6d..0ecda2d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -77,7 +77,7 @@ public interface PulsarCommandSender {
 
     Future<Void> sendMessagesToConsumer(long consumerId, String topicName, 
Subscription subscription,
             int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
-            RedeliveryTracker redeliveryTracker);
+            RedeliveryTracker redeliveryTracker, long epoch);
 
     void sendTcClientConnectResponse(long requestId, ServerError error, String 
message);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 03decb4..7c1a920 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -217,7 +217,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     @Override
     public ChannelPromise sendMessagesToConsumer(long consumerId, String 
topicName, Subscription subscription,
             int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
-            RedeliveryTracker redeliveryTracker) {
+            RedeliveryTracker redeliveryTracker, long epoch) {
         final ChannelHandlerContext ctx = cnx.ctx();
         final ChannelPromise writePromise = ctx.newPromise();
         ctx.channel().eventLoop().execute(() -> {
@@ -270,7 +270,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 ctx.write(
                         cnx.newMessageAndIntercept(consumerId, 
entry.getLedgerId(), entry.getEntryId(), partitionIdx,
                                 redeliveryCount, metadataAndPayload,
-                                batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i), topicName),
+                                batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i), topicName, epoch),
                         ctx.voidPromise());
                 entry.release();
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0dfa56f4..ef09a80 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -23,6 +23,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -952,6 +953,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // Make sure the consumer future is put into the consumers map first 
to avoid the same consumer
+        // epoch using different consumer futures, and only remove the 
consumer future from the map
+        // if subscribe failed .
+        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer> existingConsumerFuture =
+                consumers.putIfAbsent(consumerId, consumerFuture);
         isAuthorizedFuture.thenApply(isAuthorized -> {
             if (isAuthorized) {
                 if (log.isDebugEnabled()) {
@@ -964,12 +972,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     Metadata.validateMetadata(metadata);
                 } catch (IllegalArgumentException iae) {
                     final String msg = iae.getMessage();
+                    consumers.remove(consumerId, consumerFuture);
                     commandSender.sendErrorResponse(requestId, 
ServerError.MetadataError, msg);
                     return null;
                 }
-                CompletableFuture<Consumer> consumerFuture = new 
CompletableFuture<>();
-                CompletableFuture<Consumer> existingConsumerFuture = 
consumers.putIfAbsent(consumerId,
-                        consumerFuture);
 
                 if (existingConsumerFuture != null) {
                     if (existingConsumerFuture.isDone() && 
!existingConsumerFuture.isCompletedExceptionally()) {
@@ -1024,6 +1030,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                                 new 
SubscriptionNotFoundException(
                                                         "Subscription does not 
exist"));
                             }
+                            long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+                            if (subscribe.hasConsumerEpoch()) {
+                                consumerEpoch = subscribe.getConsumerEpoch();
+                            }
                             SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
                                     .subscriptionName(subscriptionName)
                                     
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1034,6 +1044,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
                                     
.subscriptionProperties(SubscriptionOption.getPropertiesMap(
                                             
subscribe.getSubscriptionPropertiesList()))
+                                    .consumerEpoch(consumerEpoch)
                                     .build();
                             if (schema != null) {
                                 return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
@@ -1101,11 +1112,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             } else {
                 String msg = "Client is not authorized to subscribe";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
+                consumers.remove(consumerId, consumerFuture);
                 ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
             }
             return null;
         }).exceptionally(ex -> {
             logAuthException(remoteAddress, "subscribe", getPrincipal(), 
Optional.of(topicName), ex);
+            consumers.remove(consumerId, consumerFuture);
             commandSender.sendErrorResponse(requestId, 
ServerError.AuthorizationError, ex.getMessage());
             return null;
         });
@@ -1533,7 +1546,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     protected void 
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) 
{
         checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received Resend Command from consumer {} ", 
remoteAddress, redeliver.getConsumerId());
+            log.debug("[{}] redeliverUnacknowledged from consumer {}, 
consumerEpoch {}",
+                    remoteAddress, redeliver.getConsumerId(), 
redeliver.getConsumerEpoch());
         }
 
         CompletableFuture<Consumer> consumerFuture = 
consumers.get(redeliver.getConsumerId());
@@ -1543,7 +1557,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (redeliver.getMessageIdsCount() > 0 && 
Subscription.isIndividualAckMode(consumer.subType())) {
                 
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
-                consumer.redeliverUnacknowledgedMessages();
+                if (redeliver.hasConsumerEpoch()) {
+                    
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch());
+                } else {
+                    
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
+                }
             }
         }
     }
@@ -2693,9 +2711,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, 
long entryId, int partition,
-            int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, 
String topic) {
+            int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, 
String topic, long epoch) {
         BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, 
entryId, partition, redeliveryCount,
-                ackSet);
+                ackSet, epoch);
         ByteBufPair res = Commands.serializeCommandMessageWithSize(command, 
metadataAndPayload);
         try {
             val brokerInterceptor = getBrokerService().getInterceptor();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 555b10d..06db648 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -87,7 +87,7 @@ public interface Subscription {
 
     boolean expireMessages(Position position);
 
-    void redeliverUnacknowledgedMessages(Consumer consumer);
+    void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch);
 
     void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> 
positions);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index 278e008..d7503f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -48,6 +48,7 @@ public class SubscriptionOption {
     private boolean replicatedSubscriptionStateArg;
     private KeySharedMeta keySharedMeta;
     private Optional<Map<String, String>> subscriptionProperties;
+    private long consumerEpoch;
 
     public static Optional<Map<String, String>> 
getPropertiesMap(List<KeyValue> list) {
         if (list == null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
index c5d4ec4..1ef2db2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
@@ -56,7 +56,7 @@ public interface NonPersistentDispatcher extends Dispatcher {
     boolean hasPermits();
 
     @Override
-    default void redeliverUnacknowledgedMessages(Consumer consumer) {
+    default void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
         // No-op
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 056fd0f..790725f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -471,7 +471,7 @@ public class NonPersistentSubscription implements 
Subscription {
     }
 
     @Override
-    public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
+    public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
      // No-op
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a90fb30..86d74a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.nonpersistent;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
 import static 
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -302,9 +303,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 
             NonPersistentSubscription subscription = 
subscriptions.computeIfAbsent(subscriptionName,
                     name -> new NonPersistentSubscription(this, 
subscriptionName, isDurable));
+
             Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName,
                     false, cnx, cnx.getAuthRole(), metadata, readCompacted, 
initialPosition, keySharedMeta,
-                    MessageId.latest);
+                    MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
             addConsumerToSubscription(subscription, consumer).thenRun(() -> {
                 if (!cnx.isActive()) {
                     try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0def1227..9bc3598 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -728,7 +728,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     @Override
-    public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer) {
+    public synchronized void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
         consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
             addMessageToReplay(ledgerId, entryId, stickyKeyHash);
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4937e86..d732523 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import io.netty.util.Recycler;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -148,7 +150,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     }
 
     public synchronized void internalReadEntriesComplete(final List<Entry> 
entries, Object obj) {
-        Consumer readConsumer = (Consumer) obj;
+        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj;
+        Consumer readConsumer = readEntriesCtx.getConsumer();
+        long epoch = readEntriesCtx.getEpoch();
+        readEntriesCtx.recycle();
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Got messages: {}", name, readConsumer, 
entries.size());
         }
@@ -201,17 +206,17 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entries.size());
             filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor, false);
-            dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, 
batchIndexesAcks, sendMessageInfo);
+            dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, 
batchIndexesAcks, sendMessageInfo, epoch);
         }
     }
 
     protected void dispatchEntriesToConsumer(Consumer currentConsumer, 
List<Entry> entries,
                                              EntryBatchSizes batchSizes, 
EntryBatchIndexesAcks batchIndexesAcks,
-                                             SendMessageInfo sendMessageInfo) {
+                                             SendMessageInfo sendMessageInfo, 
long epoch) {
         currentConsumer
             .sendMessages(entries, batchSizes, batchIndexesAcks, 
sendMessageInfo.getTotalMessages(),
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
-                    redeliveryTracker)
+                    redeliveryTracker, epoch)
             .addListener(future -> {
                 if (future.isSuccess()) {
                     int permits = dispatchThrottlingOnBatchMessageEnabled ? 
entries.size()
@@ -283,13 +288,13 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Consumer consumer) {
+    public void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
         
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, 
SafeRun.safeRun(() -> {
-            internalRedeliverUnacknowledgedMessages(consumer);
+            internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
         }));
     }
 
-    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer) {
+    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
         if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
             log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only 
the active consumer can call resend",
                     name, consumer);
@@ -305,6 +310,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         cancelPendingRead();
 
         if (!havePendingRead) {
+            if (consumerEpoch > consumer.getConsumerEpoch()) {
+                consumer.setConsumerEpoch(consumerEpoch);
+            }
             cursor.rewind();
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Cursor rewinded, redelivering 
unacknowledged messages. ", name, consumer);
@@ -321,7 +329,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     public void redeliverUnacknowledgedMessages(Consumer consumer, 
List<PositionImpl> positions) {
         // We cannot redeliver single messages to single consumers to preserve 
ordering.
         positions.forEach(redeliveryTracker::addIfAbsent);
-        redeliverUnacknowledgedMessages(consumer);
+        redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
     }
 
     @Override
@@ -351,8 +359,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
                         this, consumer);
             } else {
+                ReadEntriesCtx readEntriesCtx =
+                        ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
                 cursor.asyncReadEntriesOrWait(messagesToRead,
-                        bytesToRead, this, consumer, 
topic.getMaxReadPosition());
+                        bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
             }
         } else {
             if (log.isDebugEnabled()) {
@@ -467,7 +477,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     private synchronized void internalReadEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         havePendingRead = false;
-        Consumer c = (Consumer) ctx;
+        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) ctx;
+        Consumer c = readEntriesCtx.getConsumer();
+        readEntriesCtx.recycle();
 
         long waitTimeMillis = readFailureBackoff.next();
 
@@ -584,4 +596,44 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+    public static class ReadEntriesCtx {
+
+        private Consumer consumer;
+        private long epoch;
+
+        private final Recycler.Handle<ReadEntriesCtx> recyclerHandle;
+
+        private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) 
{
+            this.recyclerHandle = recyclerHandle;
+        }
+        private static final Recycler<ReadEntriesCtx> RECYCLER =
+                new Recycler<ReadEntriesCtx>() {
+            @Override
+            protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> 
recyclerHandle) {
+                return new ReadEntriesCtx(recyclerHandle);
+            }
+        };
+
+        public static ReadEntriesCtx create(Consumer consumer, long epoch) {
+            ReadEntriesCtx readEntriesCtx = RECYCLER.get();
+            readEntriesCtx.consumer = consumer;
+            readEntriesCtx.epoch = epoch;
+            return readEntriesCtx;
+        }
+
+        Consumer getConsumer() {
+            return consumer;
+        }
+
+        long getEpoch() {
+            return epoch;
+        }
+
+        public void recycle() {
+            consumer = null;
+            epoch = 0;
+            recyclerHandle.recycle(this);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 2e92232..0004ffd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.collect.Lists;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -169,7 +170,7 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
                     .getNextValidPosition((PositionImpl) entry.getPosition()));
             dispatchEntriesToConsumer(currentConsumer, 
Lists.newArrayList(entry), batchSizes,
-                    batchIndexesAcks, sendMessageInfo);
+                    batchIndexesAcks, sendMessageInfo, DEFAULT_CONSUMER_EPOCH);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 80c7869..6d74b53 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1031,10 +1031,10 @@ public class PersistentSubscription implements 
Subscription {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Consumer consumer) {
+    public void redeliverUnacknowledgedMessages(Consumer consumer, long 
consumerEpoch) {
         Dispatcher dispatcher = getDispatcher();
         if (dispatcher != null) {
-            dispatcher.redeliverUnacknowledgedMessages(consumer);
+            dispatcher.redeliverUnacknowledgedMessages(consumer, 
consumerEpoch);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 78610f9..9337fb7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
 import com.google.common.annotations.VisibleForTesting;
@@ -652,18 +653,20 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
                 option.getInitialPosition(), 
option.getStartMessageRollbackDurationSec(),
                 option.isReplicatedSubscriptionStateArg(), 
option.getKeySharedMeta(),
-                
option.getSubscriptionProperties().orElse(Collections.emptyMap()));
+                
option.getSubscriptionProperties().orElse(Collections.emptyMap()), 
option.getConsumerEpoch());
     }
 
     private CompletableFuture<Consumer> internalSubscribe(final TransportCnx 
cnx, String subscriptionName,
-                                                 long consumerId, SubType 
subType, int priorityLevel,
-                                                 String consumerName, boolean 
isDurable, MessageId startMessageId,
-                                                 Map<String, String> metadata, 
boolean readCompacted,
-                                                 InitialPosition 
initialPosition,
-                                                 long 
startMessageRollbackDurationSec,
-                                                 boolean 
replicatedSubscriptionStateArg,
-                                                 KeySharedMeta keySharedMeta,
-                                                 Map<String, String> 
subscriptionProperties) {
+                                                          long consumerId, 
SubType subType, int priorityLevel,
+                                                          String consumerName, 
boolean isDurable,
+                                                          MessageId 
startMessageId,
+                                                          Map<String, String> 
metadata, boolean readCompacted,
+                                                          InitialPosition 
initialPosition,
+                                                          long 
startMessageRollbackDurationSec,
+                                                          boolean 
replicatedSubscriptionStateArg,
+                                                          KeySharedMeta 
keySharedMeta,
+                                                          Map<String, String> 
subscriptionProperties,
+                                                          long consumerEpoch) {
         if (readCompacted && !(subType == SubType.Failover || subType == 
SubType.Exclusive)) {
             return FutureUtil.failedFuture(new NotAllowedException(
                     "readCompacted only allowed on failover or exclusive 
subscriptions"));
@@ -750,7 +753,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             CompletableFuture<Consumer> future = 
subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel,
                         consumerName, isDurable, cnx, cnx.getAuthRole(), 
metadata,
-                        readCompacted, initialPosition, keySharedMeta, 
startMessageId);
+                        readCompacted, initialPosition, keySharedMeta, 
startMessageId, consumerEpoch);
+
                 return addConsumerToSubscription(subscription, 
consumer).thenCompose(v -> {
                     checkBackloggedCursors();
                     if (!cnx.isActive()) {
@@ -821,7 +825,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                  KeySharedMeta keySharedMeta) {
         return internalSubscribe(cnx, subscriptionName, consumerId, subType, 
priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, 
initialPosition, startMessageRollbackDurationSec,
-                replicatedSubscriptionStateArg, keySharedMeta, null);
+                replicatedSubscriptionStateArg, keySharedMeta, null, 
DEFAULT_CONSUMER_EPOCH);
     }
 
     private CompletableFuture<Subscription> getDurableSubscription(String 
subscriptionName,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 6fafc68..f001b39 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -514,7 +515,8 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                         if (cumulativeAckOfTransaction.getKey().equals(txnId)) 
{
                             cumulativeAckOfTransaction = null;
                         }
-                        
persistentSubscription.redeliverUnacknowledgedMessages(consumer);
+                        //TODO: pendingAck handle next pr will fix
+                        
persistentSubscription.redeliverUnacknowledgedMessages(consumer, 
DEFAULT_CONSUMER_EPOCH);
                         abortFuture.complete(null);
                     }).exceptionally(e -> {
                         log.error("[{}] Transaction pending ack store abort 
txnId : [{}] fail!",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 217dd5c..d391e15 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandMessage;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
@@ -201,8 +202,8 @@ public class RawReaderImpl implements RawReader {
         }
 
         @Override
-        void messageReceived(MessageIdData messageId, int redeliveryCount,
-                             List<Long> ackSet, ByteBuf headersAndPayload, 
ClientCnx cnx) {
+        void messageReceived(CommandMessage commandMessage, ByteBuf 
headersAndPayload, ClientCnx cnx) {
+            MessageIdData messageId = commandMessage.getMessageId();
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,
                         messageId.getEntryId(), messageId.getLedgerId(), 
messageId.getPartition());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index f379500..2431d14 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.ComparisonChain;
@@ -42,6 +43,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -94,9 +96,12 @@ public class CompactedTopicImpl implements CompactedTopic {
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+
+            // TODO: redeliver epoch link 
https://github.com/apache/pulsar/issues/13690
+            ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, 
DEFAULT_CONSUMER_EPOCH);
             if (compactionHorizon == null
                 || compactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
consumer, PositionImpl.LATEST);
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
readEntriesCtx, PositionImpl.LATEST);
             } else {
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
@@ -105,11 +110,11 @@ public class CompactedTopicImpl implements CompactedTopic 
{
                             // the cursor just needs to be set to the 
compaction horizon
                             if (startPoint == COMPACT_LEDGER_EMPTY) {
                                 cursor.seek(compactionHorizon.getNext());
-                                
callback.readEntriesComplete(Collections.emptyList(), consumer);
+                                
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
                                 return CompletableFuture.completedFuture(null);
                             }
                             if (startPoint == NEWER_THAN_COMPACTED && 
compactionHorizon.compareTo(cursorPosition) < 0) {
-                                
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
+                                
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx,
                                         PositionImpl.LATEST);
                                 return CompletableFuture.completedFuture(null);
                             } else {
@@ -117,7 +122,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                                          startPoint + 
numberOfEntriesToRead);
                                 if (startPoint == NEWER_THAN_COMPACTED) {
                                     cursor.seek(compactionHorizon.getNext());
-                                    
callback.readEntriesComplete(Collections.emptyList(), consumer);
+                                    
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
                                     return 
CompletableFuture.completedFuture(null);
                                 }
                                 return readEntries(context.ledger, startPoint, 
endPoint)
@@ -129,16 +134,16 @@ public class CompactedTopicImpl implements CompactedTopic 
{
                                         // the complete last snapshot because 
of the compactor will read the data
                                         // before the compaction cursor mark 
delete position
                                         
cursor.seek(lastEntry.getPosition().getNext(), true);
-                                        callback.readEntriesComplete(entries, 
consumer);
+                                        callback.readEntriesComplete(entries, 
readEntriesCtx);
                                     });
                             }
                         }))
                     .exceptionally((exception) -> {
                         if (exception.getCause() instanceof 
NoSuchElementException) {
                             cursor.seek(compactionHorizon.getNext());
-                            
callback.readEntriesComplete(Collections.emptyList(), consumer);
+                            
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
                         } else {
-                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), consumer);
+                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), readEntriesCtx);
                         }
                         return null;
                     });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 5700b1c..7e697e6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.matches;
 import static org.mockito.ArgumentMatchers.same;
@@ -293,7 +294,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 2. Add old consumer
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
-                "Cons1"/* consumer name */, true, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, 
MessageId.latest);
+                "Cons1"/* consumer name */, true, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -304,7 +305,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 3. Add new consumer
         Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
-                "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
+                "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -333,7 +334,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 2. Add consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -357,7 +358,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 5. Add another consumer which does not change active consumer
         Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest));
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer1.consumerName());
@@ -371,7 +372,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 6. Add a consumer which changes active consumer
         Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
                 "Cons0"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer0);
         consumers = pdfc.getConsumers();
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer0.consumerName());
@@ -454,7 +455,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 2. Add a consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 1,
                 "Cons1"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertEquals(1, consumers.size());
@@ -463,7 +464,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 3. Add a consumer with same priority level and consumer name is 
smaller in lexicographic order.
         Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 1,
                 "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer2);
 
         // 4. Verify active consumer doesn't change
@@ -476,7 +477,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 5. Add another consumer which has higher priority level
         Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest));
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH));
         pdfc.addConsumer(consumer3);
         consumers = pdfc.getConsumers();
         assertEquals(3, consumers.size());
@@ -667,7 +668,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
         Consumer consumer =
                 new Consumer(sub, SubType.Shared, "test-topic", id, priority, 
""+id, true,
-                        serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+                        serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, 
MessageId.latest,DEFAULT_CONSUMER_EPOCH);
         try {
             consumer.flowPermits(permit);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 570c6cd..ac5230d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -1728,7 +1729,7 @@ public class PersistentTopicE2ETest extends 
BrokerTestBase {
 
         redeliveryMessagesField.set(dispatcher, redeliveryMessages);
         // (a) redelivery with all acked-message should clear messageReply 
bucket
-        
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
+        
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0), 
DEFAULT_CONSUMER_EPOCH);
         Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
             return redeliveryMessages.isEmpty();
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 68e0f4f..6d7882f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.anyString;
@@ -753,7 +754,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1, 0, "Cons1", true, serverCnx,
                 "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
-                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer);
         consumer.close();
 
@@ -764,7 +765,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
             consumer = new Consumer(sub, subType, topic.getName(), 1, 0, 
"Cons1", true, serverCnx, "myrole-1",
                     Collections.emptyMap(), false, InitialPosition.Latest,
-                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
             sub.addConsumer(consumer);
 
             assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -787,7 +788,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         // 1. simple add consumer
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer);
         assertTrue(sub.getDispatcher().isConsumerConnected());
 
@@ -820,7 +821,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentSubscription sub = new PersistentSubscription(topic, 
"non-durable-sub", cursorMock, false);
 
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1, 0, "Cons1", true, serverCnx,
-                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
 
         sub.addConsumer(consumer);
         assertFalse(sub.getDispatcher().isClosed());
@@ -858,14 +859,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 
1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub, consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Shared, 
topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub, consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -873,7 +874,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Shared, 
topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub, consumer3)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -886,7 +887,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub2, consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -897,7 +898,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub2, consumer5)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -962,14 +963,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub, consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub, consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -977,7 +978,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub, consumer3)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -990,7 +991,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, sub2, consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -1001,7 +1002,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub2, consumer5)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -1055,7 +1056,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(new PulsarCommandSenderImpl(null, 
cnx)).when(cnx).getCommandSender();
 
         return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 
0, consumerNameBase + consumerId, true,
-                cnx, role, Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
+                cnx, role, Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
     }
 
     @Test
@@ -1163,7 +1164,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer1);
 
         doAnswer(new Answer<Object>() {
@@ -1187,7 +1188,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             Thread.sleep(10); /* delay to ensure that the ubsubscribe gets 
executed first */
             sub.addConsumer(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */,
                     0, "Cons2"/* consumer name */, true, serverCnx,
-                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest)).get();
+                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH)).get();
             fail();
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof 
BrokerServiceException.SubscriptionFencedException);
@@ -1965,21 +1966,21 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor1 = ledger.openCursor("c1");
         PersistentSubscription sub1 = new PersistentSubscription(topic, 
"sub-1", cursor1, false);
         Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-            true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest);
+            true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
         sub1.addConsumer(consumer1);
         // Open cursor2, add it into activeCursor-container and add it into 
subscription consumer list
         ManagedCursor cursor2 = ledger.openCursor("c2");
         PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursor2, false);
         Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-            true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest);
+            true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
         sub2.addConsumer(consumer2);
         // Open cursor3, add it into activeCursor-container and do not add it 
into subscription consumer list
         ManagedCursor cursor3 = ledger.openCursor("c3");
         PersistentSubscription sub3 = new PersistentSubscription(topic, 
"sub-3", cursor3, false);
         Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
-            true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest);
+            true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
 
         // Case1: cursors are active as haven't started 
deactivateBacklogCursor scan
@@ -2089,7 +2090,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         Consumer consumer = new Consumer(nonDeletableSubscription1, 
SubType.Shared, topic.getName(), 1, 0, "consumer1",
-                true, serverCnx, "app1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
+                true, serverCnx, "app1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, 
consumer);
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
@@ -2212,7 +2213,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared, 
topic.getName(), 1, 0, "Cons1", true, serverCnx,
                 "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
                 new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false),
-                MessageId.latest);
+                MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         sub1.addConsumer(consumer1);
         consumer1.close();
 
@@ -2223,7 +2224,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, 
topic.getName(), 2, 0, "Cons2", true, serverCnx,
                 "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
                 new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true),
-                MessageId.latest);
+                MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         sub2.addConsumer(consumer2);
         consumer2.close();
 
@@ -2235,7 +2236,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 .setAllowOutOfOrderDelivery(false);
         ksm.addHashRange().setStart(0).setEnd(65535);
         Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, 
topic.getName(), 3, 0, "Cons3", true, serverCnx,
-                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, ksm, MessageId.latest);
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, ksm, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         sub3.addConsumer(consumer3);
         consumer3.close();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 2358e01..f3fd353c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -18,11 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.testng.Assert.assertEquals;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -82,7 +81,7 @@ public class CompactedOutBatchMessageTest extends 
ProducerConsumerBase {
                 .subscriptionName("my-subscriber-name").subscribe()) {
             // shove it in the sideways
             consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, 
metadata, 0, null,
-                    batchBuffer, new 
MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx());
+                    batchBuffer, new 
MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx(), 
DEFAULT_CONSUMER_EPOCH);
             Message<?> m = consumer.receive();
             assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 
1234);
             assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 
567);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
index 8b430e7..b7ff6b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
@@ -22,7 +22,7 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-
+import java.lang.reflect.Field;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -30,10 +30,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -41,16 +41,16 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import com.google.common.collect.Sets;
-
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 @Test(groups = "broker-impl")
@@ -257,4 +257,129 @@ public class MessageRedeliveryTest extends 
ProducerConsumerBase {
         assertNull(message);
     }
 
+    @Test(dataProvider = "enableBatch")
+    public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
+        final String topic = "testRedeliveryAddEpoch";
+        final String subName = "my-sub";
+        ConsumerBase<String> consumer = ((ConsumerBase<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe());
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatch)
+                .create();
+
+        String test1 = "Pulsar1";
+        String test2 = "Pulsar2";
+        String test3 = "Pulsar3";
+        producer.send(test1);
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopics()
+                .get(TopicName.get("persistent://public/default/" + 
topic).toString()).get().get();
+        PersistentDispatcherSingleActiveConsumer 
persistentDispatcherSingleActiveConsumer =
+                (PersistentDispatcherSingleActiveConsumer) 
persistentTopic.getSubscription(subName).getDispatcher();
+
+        consumer.setConsumerEpoch(1);
+        Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(message);
+        consumer.redeliverUnacknowledgedMessages();
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(message);
+        consumer.acknowledgeCumulativeAsync(message).get();
+        assertEquals(message.getValue(), test1);
+
+        consumer.setConsumerEpoch(3);
+
+        producer.send(test2);
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(message);
+
+        consumer.redeliverUnacknowledgedMessages();
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        consumer.acknowledgeCumulativeAsync(message).get();
+        consumer.redeliverUnacknowledgedMessages();
+        assertNotNull(message);
+        assertEquals(message.getValue(), test2);
+
+        consumer.setConsumerEpoch(6);
+        producer.send(test3);
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(message);
+
+        Field field = 
consumer.getClass().getDeclaredField("connectionHandler");
+        field.setAccessible(true);
+        ConnectionHandler connectionHandler = (ConnectionHandler) 
field.get(consumer);
+
+        field = 
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
+        field.setAccessible(true);
+
+        connectionHandler.cnx().channel().close();
+
+        ((ConsumerImpl<String>) consumer).grabCnx();
+        message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(message);
+        assertEquals(message.getValue(), test3);
+    }
+
+    @DataProvider(name = "enableBatch")
+    public static Object[][] enableBatch() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+
+    @Test(dataProvider = "enableBatch")
+    public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) 
throws Exception{
+        final String topic = "testMultiConsumerRedeliveryAddEpoch";
+        final String subName = "my-sub";
+        admin.topics().createPartitionedTopic(topic, 5);
+        final int messageNumber = 50;
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(enableBatch)
+                .create();
+
+        for (int i = 0; i < messageNumber; i++) {
+            producer.send("" + i);
+        }
+
+        for (int i = 0; i < messageNumber; i++) {
+            consumer.receive();
+        }
+
+        // redeliverUnacknowledgedMessages once
+        consumer.redeliverUnacknowledgedMessages();
+
+        Message<String> message;
+        for (int i = 0; i < messageNumber; i++) {
+            message = consumer.receive();
+            // message consumer epoch is 1
+            assertEquals((((MessageImpl)((TopicMessageImpl) 
message).getMessage())).getConsumerEpoch(), 1);
+        }
+
+        // can't receive message again
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        assertNull(message);
+
+        // redeliverUnacknowledgedMessages twice
+        consumer.redeliverUnacknowledgedMessages();
+
+        for (int i = 0; i < messageNumber; i++) {
+            message = consumer.receive();
+            // message consumer epoch is 2
+            assertEquals((((MessageImpl)((TopicMessageImpl) 
message).getMessage())).getConsumerEpoch(), 2);
+        }
+
+        // can't receive message again
+        message = consumer.receive(5, TimeUnit.SECONDS);
+        assertNull(message);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 934cff6..e7df294 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,7 +37,6 @@ import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -448,15 +447,7 @@ public class ClientCnx extends PulsarHandler {
         }
         ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
-            List<Long> ackSets = Collections.emptyList();
-            if (cmdMessage.getAckSetsCount() > 0) {
-                ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
-                for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
-                    ackSets.add(cmdMessage.getAckSetAt(i));
-                }
-            }
-            consumer.messageReceived(cmdMessage.getMessageId(), 
cmdMessage.getRedeliveryCount(), ackSets,
-                    headersAndPayload, this);
+            consumer.messageReceived(cmdMessage, headersAndPayload, this);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 242bb9f..7f0575a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.collect.Queues;
 import io.netty.util.Timeout;
 import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -53,6 +56,7 @@ import 
org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -86,6 +90,13 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final Lock reentrantLock = new ReentrantLock();
     private volatile boolean isListenerHandlingMessage = false;
 
+    protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH 
=
+            AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, 
"consumerEpoch");
+
+    @Setter
+    @Getter
+    protected volatile long consumerEpoch;
+
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider 
executorProvider,
                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema,
@@ -1054,5 +1065,22 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover
+                || getSubType() == CommandSubscribe.SubType.Exclusive)
+                && message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
+                && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
+            log.warn("Consumer filter old epoch message, topic : [{}], 
messageId : [{}], consumerEpoch : [{}]",
+                    topic, message.getMessageId(), consumerEpoch);
+            message.release();
+            message.recycle();
+            return false;
+        }
+        return true;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 60f9ca7..e09a934 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.collect.ComparisonChain;
@@ -86,6 +87,7 @@ import 
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.CommandMessage;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CompressionType;
 import org.apache.pulsar.common.api.proto.EncryptionKeys;
@@ -196,7 +198,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
-
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
@@ -419,6 +420,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         try {
             message = incomingMessages.take();
             messageProcessed(message);
+            if (!isValidConsumerEpoch(message)) {
+                return internalReceive();
+            }
             return beforeConsume(message);
         } catch (InterruptedException e) {
             stats.incrementNumReceiveFailed();
@@ -426,6 +430,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
+    private boolean isValidConsumerEpoch(Message<T> message) {
+        return isValidConsumerEpoch((MessageImpl<T>) message);
+    }
+
     @Override
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
@@ -437,6 +445,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
             } else {
                 messageProcessed(message);
+                if (!isValidConsumerEpoch(message)) {
+                    pendingReceives.add(result);
+                    cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
+                    return;
+                }
                 result.complete(beforeConsume(message));
             }
         });
@@ -447,12 +460,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
+        long callTime = System.nanoTime();
         try {
             message = incomingMessages.poll(timeout, unit);
             if (message == null) {
                 return null;
             }
             messageProcessed(message);
+            if (!isValidConsumerEpoch(message)) {
+                long executionTime = System.nanoTime() - callTime;
+                if (executionTime >= unit.toNanos(timeout)) {
+                    return null;
+                } else {
+                    return internalReceive((int) (timeout - executionTime), 
TimeUnit.NANOSECONDS);
+                }
+            }
             return beforeConsume(message);
         } catch (InterruptedException e) {
             State state = getState();
@@ -492,6 +514,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     Message<T> msg = incomingMessages.poll();
                     if (msg != null) {
                         messageProcessed(msg);
+                        if (!isValidConsumerEpoch(msg)) {
+                            msgPeeked = incomingMessages.peek();
+                            continue;
+                        }
                         Message<T> interceptMsg = beforeConsume(msg);
                         messages.add(interceptMsg);
                     }
@@ -736,7 +762,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             clearReceiverQueue();
             return;
         }
-        setClientCnx(cnx);
 
         log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
                 topic, subscription, cnx.ctx().channel(), consumerId);
@@ -747,8 +772,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         SUBSCRIBE_DEADLINE_UPDATER
-            .compareAndSet(this, 0L,
-                    System.currentTimeMillis() + 
client.getConfiguration().getOperationTimeoutMs());
+                .compareAndSet(this, 0L, System.currentTimeMillis()
+                        + client.getConfiguration().getOperationTimeoutMs());
 
         int currentSize;
         synchronized (this) {
@@ -780,77 +805,86 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
         // startMessageRollbackDurationInSec should be consider only once when 
consumer connects to first time
         long startMessageRollbackDuration = (startMessageRollbackDurationInSec 
> 0
-                && startMessageId != null && 
startMessageId.equals(initialStartMessageId))
-                ? startMessageRollbackDurationInSec : 0;
-        ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted,
-                conf.isReplicateSubscriptionState(), 
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
-                startMessageRollbackDuration, si, createTopicIfDoesNotExist, 
conf.getKeySharedPolicy(),
-                conf.getSubscriptionProperties());
-
-        cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-            synchronized (ConsumerImpl.this) {
-                if (changeToReadyState()) {
-                    consumerIsReconnectedToBroker(cnx, currentSize);
-                } else {
+                && startMessageId != null
+                && startMessageId.equals(initialStartMessageId)) ? 
startMessageRollbackDurationInSec : 0;
+
+        // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
+        synchronized (this) {
+            setClientCnx(cnx);
+            ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
+                    priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
+                    conf.isReplicateSubscriptionState(),
+                    
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+                    startMessageRollbackDuration, si, 
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
+                    // Use the current epoch to subscribe.
+                    conf.getSubscriptionProperties(), 
CONSUMER_EPOCH.get(this));
+
+            cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+                synchronized (ConsumerImpl.this) {
+                    if (changeToReadyState()) {
+                        consumerIsReconnectedToBroker(cnx, currentSize);
+                    } else {
+                        // Consumer was closed while reconnecting, close the 
connection to make sure the broker
+                        // drops the consumer on its side
+                        setState(State.Closed);
+                        deregisterFromClientCnx();
+                        client.cleanupConsumer(this);
+                        cnx.channel().close();
+                        return;
+                    }
+                }
+
+                resetBackoff();
+
+                boolean firstTimeConnect = subscribeFuture.complete(this);
+                // if the consumer is not partitioned or is re-connected and 
is partitioned, we send the flow
+                // command to receive messages.
+                if (!(firstTimeConnect && hasParentConsumer) && 
conf.getReceiverQueueSize() != 0) {
+                    increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
+                }
+            }).exceptionally((e) -> {
+                deregisterFromClientCnx();
+                if (getState() == State.Closing || getState() == State.Closed) 
{
                     // Consumer was closed while reconnecting, close the 
connection to make sure the broker
                     // drops the consumer on its side
-                    setState(State.Closed);
-                    deregisterFromClientCnx();
-                    client.cleanupConsumer(this);
                     cnx.channel().close();
-                    return;
+                    return null;
+                }
+                log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
+                        subscription, cnx.channel().remoteAddress());
+
+                if (e.getCause() instanceof PulsarClientException
+                        && PulsarClientException.isRetriableError(e.getCause())
+                        && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
+                    reconnectLater(e.getCause());
+                } else if (!subscribeFuture.isDone()) {
+                    // unable to create new consumer, fail operation
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    subscribeFuture.completeExceptionally(
+                            PulsarClientException.wrap(e, 
String.format("Failed to subscribe the topic %s "
+                                            + "with subscription name %s when 
connecting to the broker",
+                                    topicName.toString(), subscription)));
+                    client.cleanupConsumer(this);
+                } else if (e.getCause() instanceof TopicDoesNotExistException) 
{
+                    // The topic was deleted after the consumer was created, 
and we're
+                    // not allowed to recreate the topic. This can happen in 
few cases:
+                    //  * Regex consumer getting error after topic gets deleted
+                    //  * Regular consumer after topic is manually delete and 
with
+                    //    auto-topic-creation set to false
+                    // No more retries are needed in this case.
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    client.cleanupConsumer(this);
+                    log.warn("[{}][{}] Closed consumer because topic does not 
exist anymore {}",
+                            topic, subscription, 
cnx.channel().remoteAddress());
+                } else {
+                    // consumer was subscribed and connected but we got some 
error, keep trying
+                    reconnectLater(e.getCause());
                 }
-            }
-
-            resetBackoff();
-
-            boolean firstTimeConnect = subscribeFuture.complete(this);
-            // if the consumer is not partitioned or is re-connected and is 
partitioned, we send the flow
-            // command to receive messages.
-            if (!(firstTimeConnect && hasParentConsumer) && 
conf.getReceiverQueueSize() != 0) {
-                increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
-            }
-        }).exceptionally((e) -> {
-            deregisterFromClientCnx();
-            if (getState() == State.Closing || getState() == State.Closed) {
-                // Consumer was closed while reconnecting, close the 
connection to make sure the broker
-                // drops the consumer on its side
-                cnx.channel().close();
                 return null;
-            }
-            log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, 
subscription, cnx.channel().remoteAddress());
-
-            if (e.getCause() instanceof PulsarClientException
-                    && PulsarClientException.isRetriableError(e.getCause())
-                    && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
-                reconnectLater(e.getCause());
-            } else if (!subscribeFuture.isDone()) {
-                // unable to create new consumer, fail operation
-                setState(State.Failed);
-                closeConsumerTasks();
-                subscribeFuture.completeExceptionally(
-                    PulsarClientException.wrap(e, String.format("Failed to 
subscribe the topic %s with subscription "
-                            + "name %s when connecting to the broker", 
topicName.toString(), subscription)));
-                client.cleanupConsumer(this);
-            } else if (e.getCause() instanceof TopicDoesNotExistException) {
-                // The topic was deleted after the consumer was created, and 
we're
-                // not allowed to recreate the topic. This can happen in few 
cases:
-                //  * Regex consumer getting error after topic gets deleted
-                //  * Regular consumer after topic is manually delete and with
-                //    auto-topic-creation set to false
-                // No more retries are needed in this case.
-                setState(State.Failed);
-                closeConsumerTasks();
-                client.cleanupConsumer(this);
-                log.warn("[{}][{}] Closed consumer because topic does not 
exist anymore {}",
-                        topic, subscription, cnx.channel().remoteAddress());
-            } else {
-                // consumer was subscribed and connected but we got some 
error, keep trying
-                reconnectLater(e.getCause());
-            }
-            return null;
-        });
+            });
+        }
     }
 
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
@@ -1057,7 +1091,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                   final boolean 
containMetadata,
                                                   final BitSetRecyclable 
ackBitSet,
                                                   final BatchMessageAcker 
acker,
-                                                  final int redeliveryCount) {
+                                                  final int redeliveryCount,
+                                                  final long consumerEpoch) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] processing message num - {} in batch", 
subscription, consumerName, index);
         }
@@ -1095,8 +1130,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             final ByteBuf payloadBuffer = (singleMessagePayload != null) ? 
singleMessagePayload : payload;
             final MessageImpl<V> message = 
MessageImpl.create(topicName.toString(), batchMessageIdImpl,
                     msgMetadata, singleMessageMetadata, payloadBuffer,
-                    createEncryptionContext(msgMetadata), cnx(), schema, 
redeliveryCount, poolMessages
-            );
+                    createEncryptionContext(msgMetadata), cnx(), schema, 
redeliveryCount, poolMessages, consumerEpoch);
             message.setBrokerEntryMetadata(brokerEntryMetadata);
             return message;
         } catch (IOException | IllegalStateException e) {
@@ -1113,10 +1147,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                             final MessageMetadata 
messageMetadata,
                                             final ByteBuf payload,
                                             final Schema<V> schema,
-                                            final int redeliveryCount) {
+                                            final int redeliveryCount,
+                                            final long consumerEpoch) {
         final MessageImpl<V> message = 
MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
-                createEncryptionContext(messageMetadata), cnx(), schema, 
redeliveryCount, poolMessages
-        );
+                createEncryptionContext(messageMetadata), cnx(), schema, 
redeliveryCount, poolMessages, consumerEpoch);
         message.setBrokerEntryMetadata(brokerEntryMetadata);
         return message;
     }
@@ -1140,10 +1174,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                            final MessageIdImpl messageId,
                                            final Schema<T> schema,
                                            final int redeliveryCount,
-                                           final List<Long> ackSet) {
+                                           final List<Long> ackSet,
+                                           long consumerEpoch) {
         final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
         final MessagePayloadContextImpl entryContext = 
MessagePayloadContextImpl.get(
-                brokerEntryMetadata, messageMetadata, messageId, this, 
redeliveryCount, ackSet);
+                brokerEntryMetadata, messageMetadata, messageId, this, 
redeliveryCount, ackSet, consumerEpoch);
         final AtomicInteger skippedMessages = new AtomicInteger(0);
         try {
             conf.getPayloadProcessor().process(payload, entryContext, schema, 
message -> {
@@ -1168,8 +1203,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         tryTriggerListener();
     }
 
-    void messageReceived(MessageIdData messageId, int redeliveryCount, 
List<Long> ackSet, ByteBuf headersAndPayload,
-                         ClientCnx cnx) {
+    void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, 
ClientCnx cnx) {
+        List<Long> ackSet = Collections.emptyList();
+        if (cmdMessage.getAckSetsCount() > 0) {
+            ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
+            for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
+                ackSet.add(cmdMessage.getAckSetAt(i));
+            }
+        }
+        int redeliveryCount = cmdMessage.getRedeliveryCount();
+        MessageIdData messageId = cmdMessage.getMessageId();
+        long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+        // if broker send messages to client with consumerEpoch, we should set 
consumerEpoch to message
+        if (cmdMessage.hasConsumerEpoch()) {
+            consumerEpoch = cmdMessage.getConsumerEpoch();
+        }
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, 
messageId.getLedgerId(),
                     messageId.getEntryId());
@@ -1227,8 +1275,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         if (conf.getPayloadProcessor() != null) {
             // uncompressedPayload is released in this method so we don't need 
to call release() again
-            processPayloadByProcessor(
-                    brokerEntryMetadata, msgMetadata, uncompressedPayload, 
msgId, schema, redeliveryCount, ackSet);
+            processPayloadByProcessor(brokerEntryMetadata, msgMetadata,
+                    uncompressedPayload, msgId, schema, redeliveryCount, 
ackSet, consumerEpoch);
             return;
         }
 
@@ -1275,7 +1323,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
 
             final MessageImpl<T> message =
-                    newMessage(msgId, brokerEntryMetadata, msgMetadata, 
uncompressedPayload, schema, redeliveryCount);
+                    newMessage(msgId, brokerEntryMetadata, msgMetadata, 
uncompressedPayload,
+                            schema, redeliveryCount, consumerEpoch);
             uncompressedPayload.release();
 
             if (deadLetterPolicy != null && 
possibleSendToDeadLetterTopicMessages != null
@@ -1287,7 +1336,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         } else {
             // handle batch message enqueuing; uncompressed payload has all 
messages in batch
             receiveIndividualMessagesFromBatch(brokerEntryMetadata, 
msgMetadata, redeliveryCount, ackSet,
-                    uncompressedPayload, messageId, cnx);
+                    uncompressedPayload, messageId, cnx, consumerEpoch);
 
             uncompressedPayload.release();
         }
@@ -1422,7 +1471,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     void receiveIndividualMessagesFromBatch(BrokerEntryMetadata 
brokerEntryMetadata, MessageMetadata msgMetadata,
                                             int redeliveryCount, List<Long> 
ackSet, ByteBuf uncompressedPayload,
-                                            MessageIdData messageId, ClientCnx 
cnx) {
+                                            MessageIdData messageId, ClientCnx 
cnx, long consumerEpoch) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
         // create ack tracker for entry aka batch
@@ -1444,8 +1493,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         try {
             for (int i = 0; i < batchSize; ++i) {
                 final MessageImpl<T> message = newSingleMessage(i, batchSize, 
brokerEntryMetadata, msgMetadata,
-                        singleMessageMetadata, uncompressedPayload, 
batchMessage, schema, true, ackBitSet, acker,
-                        redeliveryCount);
+                        singleMessageMetadata, uncompressedPayload, 
batchMessage, schema, true,
+                        ackBitSet, acker, redeliveryCount, consumerEpoch);
                 if (message == null) {
                     skippedMessages++;
                     continue;
@@ -1723,6 +1772,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return getClientCnx() != null && (getState() == State.Ready);
     }
 
+    public boolean isConnected(ClientCnx cnx) {
+        return cnx != null && (getState() == State.Ready);
+    }
+
     int getPartitionIndex() {
         return partitionIndex;
     }
@@ -1739,29 +1792,54 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
 cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce 
race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the 
client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message 
anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a 
old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established "
+                            + "for redelivery of unacknowledged messages", 
this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the 
messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send 
{} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // we should increase epoch every time, because 
MultiTopicsConsumerImpl also increase it,
+            // we need to keep both epochs the same
+            if (conf.getSubscriptionType() == SubscriptionType.Failover
+                    || conf.getSubscriptionType() == 
SubscriptionType.Exclusive) {
+                CONSUMER_EPOCH.incrementAndGet(this);
+            }
+            // is channel is connected, we should send redeliver command to 
broker
+            if (cnx != null && isConnected(cnx)) {
+                
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
+                        consumerId, CONSUMER_EPOCH.get(this)), 
cnx.ctx().voidPromise());
+                if (currentSize > 0) {
+                    increaseAvailablePermits(cnx, currentSize);
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] [{}] Redeliver unacked messages and 
send {} permits", subscription, topic,
+                            consumerName, currentSize);
+                }
+            } else {
+                log.warn("[{}] Send redeliver messages command but the client 
is reconnect or close, "
+                        + "so don't need to send redeliver command to broker", 
this);
             }
-            return;
-        }
-        if (cnx == null || (getState() == State.Connecting)) {
-            log.warn("[{}] Client Connection needs to be established for 
redelivery of unacknowledged messages", this);
-        } else {
-            log.warn("[{}] Reconnecting the client to redeliver the 
messages.", this);
-            cnx.ctx().close();
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7fcc720..a184bd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -35,6 +36,7 @@ import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
@@ -72,7 +74,8 @@ public class MessageImpl<T> implements Message<T> {
     private BrokerEntryMetadata brokerEntryMetadata;
 
     private boolean poolMessage;
-
+    @Getter
+    private long consumerEpoch;
     // Constructor for out-going message
     public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, 
ByteBuffer payload, Schema<T> schema,
             String topic) {
@@ -98,75 +101,77 @@ public class MessageImpl<T> implements Message<T> {
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata 
msgMetadata, ByteBuf payload,
                 Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema) {
-        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, 
schema, 0, false);
+        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, 
schema, 0, false, DEFAULT_CONSUMER_EPOCH);
     }
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata 
msgMetadata, ByteBuf payload,
                 Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema, int redeliveryCount,
-                boolean pooledMessage) {
+                boolean pooledMessage, long consumerEpoch) {
         this.msgMetadata = new MessageMetadata();
-        init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, 
schema, redeliveryCount, pooledMessage);
+        init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
+                schema, redeliveryCount, pooledMessage, consumerEpoch);
     }
 
     public static <T> MessageImpl<T> create(String topic, MessageIdImpl 
messageId, MessageMetadata msgMetadata,
             ByteBuf payload, Optional<EncryptionContext> encryptionCtx, 
ClientCnx cnx, Schema<T> schema,
-            int redeliveryCount, boolean pooledMessage) {
+            int redeliveryCount, boolean pooledMessage, long consumerEpoch) {
         if (pooledMessage) {
             @SuppressWarnings("unchecked")
             MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
             init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, 
cnx, schema, redeliveryCount,
-                    pooledMessage);
+                    pooledMessage, consumerEpoch);
             return msg;
         } else {
             return new MessageImpl<>(topic, messageId, msgMetadata, payload, 
encryptionCtx, cnx, schema,
-                    redeliveryCount, pooledMessage);
+                    redeliveryCount, pooledMessage, consumerEpoch);
         }
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, 
MessageMetadata msgMetadata,
             SingleMessageMetadata singleMessageMetadata, ByteBuf payload, 
Optional<EncryptionContext> encryptionCtx,
-            ClientCnx cnx, Schema<T> schema) {
+            ClientCnx cnx, Schema<T> schema, long consumerEpoch) {
         this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, 
payload, encryptionCtx, cnx, schema, 0,
-                false);
+                false, consumerEpoch);
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, 
MessageMetadata batchMetadata,
             SingleMessageMetadata singleMessageMetadata, ByteBuf payload, 
Optional<EncryptionContext> encryptionCtx,
-            ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean 
keepMessageInDirectMemory) {
+            ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+                boolean keepMessageInDirectMemory, long consumerEpoch) {
         this.msgMetadata = new MessageMetadata();
-        init(this, topic, batchMessageIdImpl, batchMetadata, 
singleMessageMetadata, payload, encryptionCtx, cnx, schema,
-                redeliveryCount, keepMessageInDirectMemory);
+        init(this, topic, batchMessageIdImpl, batchMetadata, 
singleMessageMetadata, payload, encryptionCtx,
+                cnx, schema, redeliveryCount, keepMessageInDirectMemory, 
consumerEpoch);
 
     }
 
     public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl 
batchMessageIdImpl,
             MessageMetadata batchMetadata, SingleMessageMetadata 
singleMessageMetadata, ByteBuf payload,
             Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema, int redeliveryCount,
-            boolean pooledMessage) {
+            boolean pooledMessage, long consumerEpoch) {
         if (pooledMessage) {
             @SuppressWarnings("unchecked")
             MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
             init(msg, topic, batchMessageIdImpl, batchMetadata, 
singleMessageMetadata, payload, encryptionCtx, cnx,
-                    schema, redeliveryCount, pooledMessage);
+                    schema, redeliveryCount, pooledMessage, consumerEpoch);
             return msg;
         } else {
             return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, 
singleMessageMetadata, payload,
-                    encryptionCtx, cnx, schema, redeliveryCount, 
pooledMessage);
+                    encryptionCtx, cnx, schema, redeliveryCount, 
pooledMessage, consumerEpoch);
         }
     }
 
     static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl 
messageId, MessageMetadata msgMetadata,
             ByteBuf payload, Optional<EncryptionContext> encryptionCtx, 
ClientCnx cnx, Schema<T> schema,
-            int redeliveryCount, boolean poolMessage) {
+            int redeliveryCount, boolean poolMessage, long consumerEpoch) {
         init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /* 
singleMessageMetadata */, payload,
-                encryptionCtx, cnx, schema, redeliveryCount, poolMessage);
+                encryptionCtx, cnx, schema, redeliveryCount, poolMessage, 
consumerEpoch);
         msg.messageId = messageId;
     }
 
     private static <T> void init(MessageImpl<T> msg, String topic, 
BatchMessageIdImpl batchMessageIdImpl,
             MessageMetadata msgMetadata, SingleMessageMetadata 
singleMessageMetadata, ByteBuf payload,
             Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema, int redeliveryCount,
-            boolean poolMessage) {
+            boolean poolMessage, long consumerEpoch) {
         msg.msgMetadata.clear();
         msg.msgMetadata.copyFrom(msgMetadata);
         msg.messageId = batchMessageIdImpl;
@@ -175,6 +180,7 @@ public class MessageImpl<T> implements Message<T> {
         msg.redeliveryCount = redeliveryCount;
         msg.encryptionCtx = encryptionCtx;
         msg.schema = schema;
+        msg.consumerEpoch = consumerEpoch;
 
         msg.poolMessage = poolMessage;
         // If it's not pool message then need to make a copy since the passed 
payload is
@@ -289,6 +295,7 @@ public class MessageImpl<T> implements Message<T> {
         msg.topic = null;
         msg.cnx = null;
         msg.brokerEntryMetadata = brokerEntryMetadata;
+        msg.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
         return msg;
     }
 
@@ -636,6 +643,7 @@ public class MessageImpl<T> implements Message<T> {
         schema = null;
         schemaState = SchemaState.None;
         poolMessage = false;
+        consumerEpoch = DEFAULT_CONSUMER_EPOCH;
 
         if (recyclerHandle != null) {
             recyclerHandle.recycle(this);
@@ -686,6 +694,7 @@ public class MessageImpl<T> implements Message<T> {
         this.redeliveryCount = 0;
         this.msgMetadata = new MessageMetadata();
         this.brokerEntryMetadata = new BrokerEntryMetadata();
+        this.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
     }
 
     private Handle<MessageImpl<?>> recyclerHandle;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
index 443c79b..c4eb264 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import java.util.List;
@@ -51,6 +52,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
     private int redeliveryCount;
     private BatchMessageAcker acker;
     private BitSetRecyclable ackBitSet;
+    private long consumerEpoch;
 
     private MessagePayloadContextImpl(final 
Recycler.Handle<MessagePayloadContextImpl> handle) {
         this.recyclerHandle = handle;
@@ -61,8 +63,10 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
                                                 @NonNull final MessageIdImpl 
messageId,
                                                 @NonNull final ConsumerImpl<?> 
consumer,
                                                 final int redeliveryCount,
-                                                final List<Long> ackSet) {
+                                                final List<Long> ackSet,
+                                                final long consumerEpoch) {
         final MessagePayloadContextImpl context = RECYCLER.get();
+        context.consumerEpoch = consumerEpoch;
         context.brokerEntryMetadata = brokerEntryMetadata;
         context.messageMetadata = messageMetadata;
         context.singleMessageMetadata = new SingleMessageMetadata();
@@ -83,6 +87,7 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
         messageId = null;
         consumer = null;
         redeliveryCount = 0;
+        consumerEpoch = DEFAULT_CONSUMER_EPOCH;
         acker = null;
         if (ackBitSet != null) {
             ackBitSet.recycle();
@@ -130,7 +135,8 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
                     containMetadata,
                     ackBitSet,
                     acker,
-                    redeliveryCount);
+                    redeliveryCount,
+                    consumerEpoch);
         } finally {
             payloadBuffer.release();
         }
@@ -140,8 +146,8 @@ public class MessagePayloadContextImpl implements 
MessagePayloadContext {
     public <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> 
schema) {
         final ByteBuf payloadBuffer = 
MessagePayloadUtils.convertToByteBuf(payload);
         try {
-            return consumer.newMessage(
-                    messageId, brokerEntryMetadata, messageMetadata, 
payloadBuffer, schema, redeliveryCount);
+            return consumer.newMessage(messageId, brokerEntryMetadata,
+                    messageMetadata, payloadBuffer, schema, redeliveryCount, 
consumerEpoch);
         } finally {
             payloadBuffer.release();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a3e85f2..dc58ec4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -106,7 +106,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     private volatile BatchMessageIdImpl startMessageId = null;
     private final long startMessageRollbackDurationInSec;
-
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
             ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
             ConsumerInterceptors<T> interceptors, boolean 
createTopicIfDoesNotExist) {
@@ -334,6 +333,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    private boolean isValidConsumerEpoch(Message<T> message) {
+        return isValidConsumerEpoch(((MessageImpl<T>) (((TopicMessageImpl<T>) 
message))
+                .getMessage()));
+    }
+
     @Override
     protected Message<T> internalReceive() throws PulsarClientException {
         Message<T> message;
@@ -341,6 +348,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             message = incomingMessages.take();
             decreaseIncomingMessageSize(message);
             checkState(message instanceof TopicMessageImpl);
+            if (!isValidConsumerEpoch(message)) {
+                resumeReceivingFromPausedConsumersIfNeeded();
+                message.release();
+                return internalReceive();
+            }
             unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
@@ -352,11 +364,22 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
+
+        long callTime = System.nanoTime();
         try {
             message = incomingMessages.poll(timeout, unit);
             if (message != null) {
                 decreaseIncomingMessageSize(message);
                 checkArgument(message instanceof TopicMessageImpl);
+                if (!isValidConsumerEpoch(message)) {
+                    long executionTime = System.nanoTime() - callTime;
+                    if (executionTime >= unit.toNanos(timeout)) {
+                        return null;
+                    } else {
+                        resumeReceivingFromPausedConsumersIfNeeded();
+                        return internalReceive((int) (timeout - 
executionTime), TimeUnit.NANOSECONDS);
+                    }
+                }
                 unAckedMessageTracker.add(message.getMessageId(), 
message.getRedeliveryCount());
             }
             resumeReceivingFromPausedConsumersIfNeeded();
@@ -394,6 +417,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     Message<T> msg = incomingMessages.poll();
                     if (msg != null) {
                         decreaseIncomingMessageSize(msg);
+                        if (!isValidConsumerEpoch(msg)) {
+                            msgPeeked = incomingMessages.peek();
+                            continue;
+                        }
                         Message<T> interceptMsg = beforeConsume(msg);
                         messages.add(interceptMsg);
                     }
@@ -653,6 +680,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     public void redeliverUnacknowledgedMessages() {
         lock.writeLock().lock();
         try {
+            CONSUMER_EPOCH.incrementAndGet(this);
             consumers.values().stream().forEach(consumer -> {
                 consumer.redeliverUnacknowledgedMessages();
                 consumer.unAckedChunkedMessageIdSequenceMap.clear();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index eb84faa..fe6b929 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -177,7 +177,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
     @Override
     void receiveIndividualMessagesFromBatch(BrokerEntryMetadata 
brokerEntryMetadata, MessageMetadata msgMetadata,
                                             int redeliveryCount, List<Long> 
ackSet, ByteBuf uncompressedPayload,
-                                            MessageIdData messageId, ClientCnx 
cnx) {
+                                            MessageIdData messageId, ClientCnx 
cnx, long consumerEpoch) {
         log.warn(
                 "Closing consumer [{}]-[{}] due to unsupported received 
batch-message with zero receiver queue size",
                 subscription, consumerName);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 12bfba1..95b21fd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -115,6 +115,10 @@ public class Commands {
     public static final int MESSAGE_SIZE_FRAME_PADDING = 10 * 1024;
     public static final int INVALID_MAX_MESSAGE_SIZE = -1;
 
+    // this present broker version don't have consumerEpoch feature,
+    // so client don't need to think about consumerEpoch feature
+    public static final long DEFAULT_CONSUMER_EPOCH = -1L;
+
     @SuppressWarnings("checkstyle:ConstantName")
     public static final short magicCrc32c = 0x0e01;
     @SuppressWarnings("checkstyle:ConstantName")
@@ -459,7 +463,7 @@ public class Commands {
     }
 
     public static BaseCommand newMessageCommand(long consumerId, long 
ledgerId, long entryId, int partition,
-            int redeliveryCount, long[] ackSet) {
+            int redeliveryCount, long[] ackSet, long consumerEpoch) {
         BaseCommand cmd = localCmd(Type.MESSAGE);
         CommandMessage msg = cmd.setMessage()
                 .setConsumerId(consumerId);
@@ -467,6 +471,11 @@ public class Commands {
                 .setLedgerId(ledgerId)
                 .setEntryId(entryId)
                 .setPartition(partition);
+
+        // consumerEpoch > -1 is useful
+        if (consumerEpoch > DEFAULT_CONSUMER_EPOCH) {
+            msg.setConsumerEpoch(consumerEpoch);
+        }
         if (redeliveryCount > 0) {
             msg.setRedeliveryCount(redeliveryCount);
         }
@@ -481,8 +490,8 @@ public class Commands {
     public static ByteBufPair newMessage(long consumerId, long ledgerId, long 
entryId, int partition,
             int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet) {
         return serializeCommandMessageWithSize(
-                newMessageCommand(consumerId, ledgerId, entryId, partition, 
redeliveryCount, ackSet),
-                metadataAndPayload);
+                newMessageCommand(consumerId, ledgerId, entryId, partition, 
redeliveryCount, ackSet,
+                        DEFAULT_CONSUMER_EPOCH), metadataAndPayload);
     }
 
     public static ByteBufPair newSend(long producerId, long sequenceId, int 
numMessaegs, ChecksumType checksumType,
@@ -547,7 +556,7 @@ public class Commands {
         return newSubscribe(topic, subscription, consumerId, requestId, 
subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, 
isReplicated, subscriptionInitialPosition,
                 startMessageRollbackDurationInSec, schemaInfo, 
createTopicIfDoesNotExist, null,
-                Collections.emptyMap());
+                Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
@@ -555,7 +564,7 @@ public class Commands {
                Map<String, String> metadata, boolean readCompacted, boolean 
isReplicated,
                InitialPosition subscriptionInitialPosition, long 
startMessageRollbackDurationInSec,
                SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, 
KeySharedPolicy keySharedPolicy,
-               Map<String, String> subscriptionProperties) {
+               Map<String, String> subscriptionProperties, long consumerEpoch) 
{
         BaseCommand cmd = localCmd(Type.SUBSCRIBE);
         CommandSubscribe subscribe = cmd.setSubscribe()
                 .setTopic(topic)
@@ -569,7 +578,8 @@ public class Commands {
                 .setReadCompacted(readCompacted)
                 .setInitialPosition(subscriptionInitialPosition)
                 .setReplicateSubscriptionState(isReplicated)
-                .setForceTopicCreation(createTopicIfDoesNotExist);
+                .setForceTopicCreation(createTopicIfDoesNotExist)
+                .setConsumerEpoch(consumerEpoch);
 
         if (subscriptionProperties != null && 
!subscriptionProperties.isEmpty()) {
             List<KeyValue> keyValues = new ArrayList<>();
@@ -1033,10 +1043,11 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId) {
+    public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId, 
long consumerEpoch) {
         BaseCommand cmd = localCmd(Type.REDELIVER_UNACKNOWLEDGED_MESSAGES);
         cmd.setRedeliverUnacknowledgedMessages()
-                .setConsumerId(consumerId);
+                .setConsumerId(consumerId)
+                .setConsumerEpoch(consumerEpoch);
         return serializeWithSize(cmd);
     }
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index b0e343c..fcb516c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -388,6 +388,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch
+    optional uint64 consumer_epoch = 19;
 }
 
 message CommandPartitionedTopicMetadata {
@@ -531,6 +534,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5;
 }
 
 message CommandAck {
@@ -621,6 +625,7 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3;
 }
 
 message CommandSuccess {

Reply via email to