This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f89d649 [Broker] PIP:84 Redeliver command add epoch. (#10478)
f89d649 is described below
commit f89d6490371177cbb8b3f3b08f1bf2203e420dc3
Author: congbo <[email protected]>
AuthorDate: Mon Feb 14 22:20:12 2022 +0800
[Broker] PIP:84 Redeliver command add epoch. (#10478)
## Motivation
detail in
https://github.com/apache/pulsar/wiki/PIP-84-%3A-Pulsar-client%3A-Redeliver-command-add-epoch.
### Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
---
.../org/apache/pulsar/broker/service/Consumer.java | 26 +-
.../apache/pulsar/broker/service/Dispatcher.java | 2 +-
.../pulsar/broker/service/PulsarCommandSender.java | 2 +-
.../broker/service/PulsarCommandSenderImpl.java | 4 +-
.../apache/pulsar/broker/service/ServerCnx.java | 32 ++-
.../apache/pulsar/broker/service/Subscription.java | 2 +-
.../pulsar/broker/service/SubscriptionOption.java | 1 +
.../nonpersistent/NonPersistentDispatcher.java | 2 +-
.../nonpersistent/NonPersistentSubscription.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 5 +-
.../PersistentDispatcherMultipleConsumers.java | 2 +-
.../PersistentDispatcherSingleActiveConsumer.java | 72 ++++-
...entStreamingDispatcherSingleActiveConsumer.java | 3 +-
.../service/persistent/PersistentSubscription.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 26 +-
.../pendingack/impl/PendingAckHandleImpl.java | 4 +-
.../apache/pulsar/client/impl/RawReaderImpl.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 19 +-
.../PersistentDispatcherFailoverConsumerTest.java | 19 +-
.../broker/service/PersistentTopicE2ETest.java | 3 +-
.../pulsar/broker/service/PersistentTopicTest.java | 49 ++--
.../client/impl/CompactedOutBatchMessageTest.java | 5 +-
.../pulsar/client/impl/MessageRedeliveryTest.java | 133 +++++++++-
.../org/apache/pulsar/client/impl/ClientCnx.java | 11 +-
.../apache/pulsar/client/impl/ConsumerBase.java | 28 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 294 +++++++++++++--------
.../org/apache/pulsar/client/impl/MessageImpl.java | 45 ++--
.../client/impl/MessagePayloadContextImpl.java | 14 +-
.../client/impl/MultiTopicsConsumerImpl.java | 30 ++-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +-
.../apache/pulsar/common/protocol/Commands.java | 27 +-
pulsar-common/src/main/proto/PulsarApi.proto | 5 +
32 files changed, 632 insertions(+), 246 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0d26250..9e53438 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.Future;
@@ -33,6 +34,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -131,11 +134,15 @@ public class Consumer {
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
+ @Getter
+ @Setter
+ private volatile long consumerEpoch;
+
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
InitialPosition subscriptionInitialPosition,
- KeySharedMeta keySharedMeta, MessageId startMessageId) {
+ KeySharedMeta keySharedMeta, MessageId startMessageId,
long consumerEpoch) {
this.subscription = subscription;
this.subType = subType;
@@ -186,6 +193,7 @@ public class Consumer {
}
this.clientAddress = cnx.clientSourceAddress();
+ this.consumerEpoch = consumerEpoch;
this.isAcknowledgmentAtBatchIndexLevelEnabled =
subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
}
@@ -214,6 +222,14 @@ public class Consumer {
return readCompacted;
}
+ public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
+ EntryBatchIndexesAcks batchIndexesAcks,
+ int totalMessages, long totalBytes, long
totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker) {
+ return sendMessages(entries, batchSizes, batchIndexesAcks,
totalMessages, totalBytes,
+ totalChunkedMessages, redeliveryTracker,
DEFAULT_CONSUMER_EPOCH);
+ }
+
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries
object.</b>
@@ -223,7 +239,7 @@ public class Consumer {
public Future<Void> sendMessages(final List<Entry> entries,
EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, long
totalChunkedMessages,
- RedeliveryTracker redeliveryTracker) {
+ RedeliveryTracker redeliveryTracker, long
epoch) {
this.lastConsumedTimestamp = System.currentTimeMillis();
if (entries.isEmpty() || totalMessages == 0) {
@@ -286,7 +302,7 @@ public class Consumer {
return cnx.getCommandSender().sendMessagesToConsumer(consumerId,
topicName, subscription, partitionIdx,
- entries, batchSizes, batchIndexesAcks, redeliveryTracker);
+ entries, batchSizes, batchIndexesAcks, redeliveryTracker,
epoch);
}
private void incrementUnackedMessages(int ackedMessages) {
@@ -850,7 +866,7 @@ public class Consumer {
return priorityLevel;
}
- public void redeliverUnacknowledgedMessages() {
+ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs();
blockedConsumerOnUnackedMsgs = false;
@@ -875,7 +891,7 @@ public class Consumer {
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(),
totalRedeliveryMessages.intValue());
subscription.redeliverUnacknowledgedMessages(this,
pendingPositions);
} else {
- subscription.redeliverUnacknowledgedMessages(this);
+ subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
}
flowConsumerBlockedPermits(this);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 7dab8b6..918f0d2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -80,7 +80,7 @@ public interface Dispatcher {
SubType getType();
- void redeliverUnacknowledgedMessages(Consumer consumer);
+ void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch);
void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl>
positions);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 6fcbb6d..0ecda2d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -77,7 +77,7 @@ public interface PulsarCommandSender {
Future<Void> sendMessagesToConsumer(long consumerId, String topicName,
Subscription subscription,
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- RedeliveryTracker redeliveryTracker);
+ RedeliveryTracker redeliveryTracker, long epoch);
void sendTcClientConnectResponse(long requestId, ServerError error, String
message);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 03decb4..7c1a920 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -217,7 +217,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
@Override
public ChannelPromise sendMessagesToConsumer(long consumerId, String
topicName, Subscription subscription,
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- RedeliveryTracker redeliveryTracker) {
+ RedeliveryTracker redeliveryTracker, long epoch) {
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
ctx.channel().eventLoop().execute(() -> {
@@ -270,7 +270,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
ctx.write(
cnx.newMessageAndIntercept(consumerId,
entry.getLedgerId(), entry.getEntryId(), partitionIdx,
redeliveryCount, metadataAndPayload,
- batchIndexesAcks == null ? null :
batchIndexesAcks.getAckSet(i), topicName),
+ batchIndexesAcks == null ? null :
batchIndexesAcks.getAckSet(i), topicName, epoch),
ctx.voidPromise());
entry.release();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0dfa56f4..ef09a80 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -23,6 +23,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@@ -952,6 +953,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
subscriptionName,
TopicOperation.CONSUME
);
+
+ // Make sure the consumer future is put into the consumers map first
to avoid the same consumer
+ // epoch using different consumer futures, and only remove the
consumer future from the map
+ // if subscribe failed .
+ CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+ CompletableFuture<Consumer> existingConsumerFuture =
+ consumers.putIfAbsent(consumerId, consumerFuture);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
@@ -964,12 +972,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
+ consumers.remove(consumerId, consumerFuture);
commandSender.sendErrorResponse(requestId,
ServerError.MetadataError, msg);
return null;
}
- CompletableFuture<Consumer> consumerFuture = new
CompletableFuture<>();
- CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId,
- consumerFuture);
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() &&
!existingConsumerFuture.isCompletedExceptionally()) {
@@ -1024,6 +1030,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
new
SubscriptionNotFoundException(
"Subscription does not
exist"));
}
+ long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+ if (subscribe.hasConsumerEpoch()) {
+ consumerEpoch = subscribe.getConsumerEpoch();
+ }
SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1034,6 +1044,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
.subscriptionProperties(SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList()))
+ .consumerEpoch(consumerEpoch)
.build();
if (schema != null) {
return
topic.addSchemaIfIdleOrCheckCompatible(schema)
@@ -1101,11 +1112,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
+ consumers.remove(consumerId, consumerFuture);
ctx.writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(),
Optional.of(topicName), ex);
+ consumers.remove(consumerId, consumerFuture);
commandSender.sendErrorResponse(requestId,
ServerError.AuthorizationError, ex.getMessage());
return null;
});
@@ -1533,7 +1546,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
protected void
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver)
{
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
- log.debug("[{}] Received Resend Command from consumer {} ",
remoteAddress, redeliver.getConsumerId());
+ log.debug("[{}] redeliverUnacknowledged from consumer {},
consumerEpoch {}",
+ remoteAddress, redeliver.getConsumerId(),
redeliver.getConsumerEpoch());
}
CompletableFuture<Consumer> consumerFuture =
consumers.get(redeliver.getConsumerId());
@@ -1543,7 +1557,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (redeliver.getMessageIdsCount() > 0 &&
Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
- consumer.redeliverUnacknowledgedMessages();
+ if (redeliver.hasConsumerEpoch()) {
+
consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch());
+ } else {
+
consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH);
+ }
}
}
}
@@ -2693,9 +2711,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId,
long entryId, int partition,
- int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet,
String topic) {
+ int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet,
String topic, long epoch) {
BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId,
entryId, partition, redeliveryCount,
- ackSet);
+ ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command,
metadataAndPayload);
try {
val brokerInterceptor = getBrokerService().getInterceptor();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 555b10d..06db648 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -87,7 +87,7 @@ public interface Subscription {
boolean expireMessages(Position position);
- void redeliverUnacknowledgedMessages(Consumer consumer);
+ void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch);
void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl>
positions);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index 278e008..d7503f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -48,6 +48,7 @@ public class SubscriptionOption {
private boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
+ private long consumerEpoch;
public static Optional<Map<String, String>>
getPropertiesMap(List<KeyValue> list) {
if (list == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
index c5d4ec4..1ef2db2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcher.java
@@ -56,7 +56,7 @@ public interface NonPersistentDispatcher extends Dispatcher {
boolean hasPermits();
@Override
- default void redeliverUnacknowledgedMessages(Consumer consumer) {
+ default void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
// No-op
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 056fd0f..790725f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -471,7 +471,7 @@ public class NonPersistentSubscription implements
Subscription {
}
@Override
- public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer) {
+ public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
// No-op
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a90fb30..86d74a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static
org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -302,9 +303,11 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
NonPersistentSubscription subscription =
subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this,
subscriptionName, isDurable));
+
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted,
initialPosition, keySharedMeta,
- MessageId.latest);
+ MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0def1227..9bc3598 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -728,7 +728,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
@Override
- public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer) {
+ public synchronized void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4937e86..d732523 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -148,7 +150,10 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
public synchronized void internalReadEntriesComplete(final List<Entry>
entries, Object obj) {
- Consumer readConsumer = (Consumer) obj;
+ ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj;
+ Consumer readConsumer = readEntriesCtx.getConsumer();
+ long epoch = readEntriesCtx.getEpoch();
+ readEntriesCtx.recycle();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Got messages: {}", name, readConsumer,
entries.size());
}
@@ -201,17 +206,17 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchIndexesAcks batchIndexesAcks =
EntryBatchIndexesAcks.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, false);
- dispatchEntriesToConsumer(currentConsumer, entries, batchSizes,
batchIndexesAcks, sendMessageInfo);
+ dispatchEntriesToConsumer(currentConsumer, entries, batchSizes,
batchIndexesAcks, sendMessageInfo, epoch);
}
}
protected void dispatchEntriesToConsumer(Consumer currentConsumer,
List<Entry> entries,
EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- SendMessageInfo sendMessageInfo) {
+ SendMessageInfo sendMessageInfo,
long epoch) {
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
- redeliveryTracker)
+ redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
int permits = dispatchThrottlingOnBatchMessageEnabled ?
entries.size()
@@ -283,13 +288,13 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
@Override
- public void redeliverUnacknowledgedMessages(Consumer consumer) {
+ public void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
- internalRedeliverUnacknowledgedMessages(consumer);
+ internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
}));
}
- private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer) {
+ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only
the active consumer can call resend",
name, consumer);
@@ -305,6 +310,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
cancelPendingRead();
if (!havePendingRead) {
+ if (consumerEpoch > consumer.getConsumerEpoch()) {
+ consumer.setConsumerEpoch(consumerEpoch);
+ }
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering
unacknowledged messages. ", name, consumer);
@@ -321,7 +329,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
public void redeliverUnacknowledgedMessages(Consumer consumer,
List<PositionImpl> positions) {
// We cannot redeliver single messages to single consumers to preserve
ordering.
positions.forEach(redeliveryTracker::addIfAbsent);
- redeliverUnacknowledgedMessages(consumer);
+ redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
}
@Override
@@ -351,8 +359,10 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
this, consumer);
} else {
+ ReadEntriesCtx readEntriesCtx =
+ ReadEntriesCtx.create(consumer,
consumer.getConsumerEpoch());
cursor.asyncReadEntriesOrWait(messagesToRead,
- bytesToRead, this, consumer,
topic.getMaxReadPosition());
+ bytesToRead, this, readEntriesCtx,
topic.getMaxReadPosition());
}
} else {
if (log.isDebugEnabled()) {
@@ -467,7 +477,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
private synchronized void internalReadEntriesFailed(ManagedLedgerException
exception, Object ctx) {
havePendingRead = false;
- Consumer c = (Consumer) ctx;
+ ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) ctx;
+ Consumer c = readEntriesCtx.getConsumer();
+ readEntriesCtx.recycle();
long waitTimeMillis = readFailureBackoff.next();
@@ -584,4 +596,44 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
}
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+ public static class ReadEntriesCtx {
+
+ private Consumer consumer;
+ private long epoch;
+
+ private final Recycler.Handle<ReadEntriesCtx> recyclerHandle;
+
+ private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle)
{
+ this.recyclerHandle = recyclerHandle;
+ }
+ private static final Recycler<ReadEntriesCtx> RECYCLER =
+ new Recycler<ReadEntriesCtx>() {
+ @Override
+ protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx>
recyclerHandle) {
+ return new ReadEntriesCtx(recyclerHandle);
+ }
+ };
+
+ public static ReadEntriesCtx create(Consumer consumer, long epoch) {
+ ReadEntriesCtx readEntriesCtx = RECYCLER.get();
+ readEntriesCtx.consumer = consumer;
+ readEntriesCtx.epoch = epoch;
+ return readEntriesCtx;
+ }
+
+ Consumer getConsumer() {
+ return consumer;
+ }
+
+ long getEpoch() {
+ return epoch;
+ }
+
+ public void recycle() {
+ consumer = null;
+ epoch = 0;
+ recyclerHandle.recycle(this);
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 2e92232..0004ffd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.collect.Lists;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@@ -169,7 +170,7 @@ public class
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
.getNextValidPosition((PositionImpl) entry.getPosition()));
dispatchEntriesToConsumer(currentConsumer,
Lists.newArrayList(entry), batchSizes,
- batchIndexesAcks, sendMessageInfo);
+ batchIndexesAcks, sendMessageInfo, DEFAULT_CONSUMER_EPOCH);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 80c7869..6d74b53 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1031,10 +1031,10 @@ public class PersistentSubscription implements
Subscription {
}
@Override
- public void redeliverUnacknowledgedMessages(Consumer consumer) {
+ public void redeliverUnacknowledgedMessages(Consumer consumer, long
consumerEpoch) {
Dispatcher dispatcher = getDispatcher();
if (dispatcher != null) {
- dispatcher.redeliverUnacknowledgedMessages(consumer);
+ dispatcher.redeliverUnacknowledgedMessages(consumer,
consumerEpoch);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 78610f9..9337fb7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.annotations.VisibleForTesting;
@@ -652,18 +653,20 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
-
option.getSubscriptionProperties().orElse(Collections.emptyMap()));
+
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch());
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx
cnx, String subscriptionName,
- long consumerId, SubType
subType, int priorityLevel,
- String consumerName, boolean
isDurable, MessageId startMessageId,
- Map<String, String> metadata,
boolean readCompacted,
- InitialPosition
initialPosition,
- long
startMessageRollbackDurationSec,
- boolean
replicatedSubscriptionStateArg,
- KeySharedMeta keySharedMeta,
- Map<String, String>
subscriptionProperties) {
+ long consumerId,
SubType subType, int priorityLevel,
+ String consumerName,
boolean isDurable,
+ MessageId
startMessageId,
+ Map<String, String>
metadata, boolean readCompacted,
+ InitialPosition
initialPosition,
+ long
startMessageRollbackDurationSec,
+ boolean
replicatedSubscriptionStateArg,
+ KeySharedMeta
keySharedMeta,
+ Map<String, String>
subscriptionProperties,
+ long consumerEpoch) {
if (readCompacted && !(subType == SubType.Failover || subType ==
SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive
subscriptions"));
@@ -750,7 +753,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Consumer> future =
subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel,
consumerName, isDurable, cnx, cnx.getAuthRole(),
metadata,
- readCompacted, initialPosition, keySharedMeta,
startMessageId);
+ readCompacted, initialPosition, keySharedMeta,
startMessageId, consumerEpoch);
+
return addConsumerToSubscription(subscription,
consumer).thenCompose(v -> {
checkBackloggedCursors();
if (!cnx.isActive()) {
@@ -821,7 +825,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType,
priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted,
initialPosition, startMessageRollbackDurationSec,
- replicatedSubscriptionStateArg, keySharedMeta, null);
+ replicatedSubscriptionStateArg, keySharedMeta, null,
DEFAULT_CONSUMER_EPOCH);
}
private CompletableFuture<Subscription> getDurableSubscription(String
subscriptionName,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 6fafc68..f001b39 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -514,7 +515,8 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
if (cumulativeAckOfTransaction.getKey().equals(txnId))
{
cumulativeAckOfTransaction = null;
}
-
persistentSubscription.redeliverUnacknowledgedMessages(consumer);
+ //TODO: pendingAck handle next pr will fix
+
persistentSubscription.redeliverUnacknowledgedMessages(consumer,
DEFAULT_CONSUMER_EPOCH);
abortFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}] Transaction pending ack store abort
txnId : [{}] fail!",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 217dd5c..d391e15 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
@@ -201,8 +202,8 @@ public class RawReaderImpl implements RawReader {
}
@Override
- void messageReceived(MessageIdData messageId, int redeliveryCount,
- List<Long> ackSet, ByteBuf headersAndPayload,
ClientCnx cnx) {
+ void messageReceived(CommandMessage commandMessage, ByteBuf
headersAndPayload, ClientCnx cnx) {
+ MessageIdData messageId = commandMessage.getMessageId();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic,
subscription,
messageId.getEntryId(), messageId.getLedgerId(),
messageId.getPartition());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index f379500..2431d14 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ComparisonChain;
@@ -42,6 +43,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -94,9 +96,12 @@ public class CompactedTopicImpl implements CompactedTopic {
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
+
+ // TODO: redeliver epoch link
https://github.com/apache/pulsar/issues/13690
+ ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer,
DEFAULT_CONSUMER_EPOCH);
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback,
consumer, PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback,
readEntriesCtx, PositionImpl.LATEST);
} else {
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition,
context.ledger.getLastAddConfirmed(), context.cache)
@@ -105,11 +110,11 @@ public class CompactedTopicImpl implements CompactedTopic
{
// the cursor just needs to be set to the
compaction horizon
if (startPoint == COMPACT_LEDGER_EMPTY) {
cursor.seek(compactionHorizon.getNext());
-
callback.readEntriesComplete(Collections.emptyList(), consumer);
+
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
return CompletableFuture.completedFuture(null);
}
if (startPoint == NEWER_THAN_COMPACTED &&
compactionHorizon.compareTo(cursorPosition) < 0) {
-
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
+
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
return CompletableFuture.completedFuture(null);
} else {
@@ -117,7 +122,7 @@ public class CompactedTopicImpl implements CompactedTopic {
startPoint +
numberOfEntriesToRead);
if (startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(compactionHorizon.getNext());
-
callback.readEntriesComplete(Collections.emptyList(), consumer);
+
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
return
CompletableFuture.completedFuture(null);
}
return readEntries(context.ledger, startPoint,
endPoint)
@@ -129,16 +134,16 @@ public class CompactedTopicImpl implements CompactedTopic
{
// the complete last snapshot because
of the compactor will read the data
// before the compaction cursor mark
delete position
cursor.seek(lastEntry.getPosition().getNext(), true);
- callback.readEntriesComplete(entries,
consumer);
+ callback.readEntriesComplete(entries,
readEntriesCtx);
});
}
}))
.exceptionally((exception) -> {
if (exception.getCause() instanceof
NoSuchElementException) {
cursor.seek(compactionHorizon.getNext());
-
callback.readEntriesComplete(Collections.emptyList(), consumer);
+
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
} else {
- callback.readEntriesFailed(new
ManagedLedgerException(exception), consumer);
+ callback.readEntriesFailed(new
ManagedLedgerException(exception), readEntriesCtx);
}
return null;
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 5700b1c..7e697e6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.ArgumentMatchers.same;
@@ -293,7 +294,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
- "Cons1"/* consumer name */, true, serverCnxWithOldVersion,
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null,
MessageId.latest);
+ "Cons1"/* consumer name */, true, serverCnxWithOldVersion,
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -304,7 +305,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0,
- "Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
+ "Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -333,7 +334,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest));
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -357,7 +358,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest));
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer1.consumerName());
@@ -371,7 +372,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 6. Add a consumer which changes active consumer
Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive,
topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest));
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertSame(pdfc.getActiveConsumer().consumerName(),
consumer0.consumerName());
@@ -454,7 +455,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add a consumer
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 1 /* consumer id */, 1,
"Cons1"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest));
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertEquals(1, consumers.size());
@@ -463,7 +464,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 3. Add a consumer with same priority level and consumer name is
smaller in lexicographic order.
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 2 /* consumer id */, 1,
"Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest));
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer2);
// 4. Verify active consumer doesn't change
@@ -476,7 +477,7 @@ public class PersistentDispatcherFailoverConsumerTest {
// 5. Add another consumer which has higher priority level
Consumer consumer3 = spy(new Consumer(sub, SubType.Failover,
topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest));
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH));
pdfc.addConsumer(consumer3);
consumers = pdfc.getConsumers();
assertEquals(3, consumers.size());
@@ -667,7 +668,7 @@ public class PersistentDispatcherFailoverConsumerTest {
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
Consumer consumer =
new Consumer(sub, SubType.Shared, "test-topic", id, priority,
""+id, true,
- serverCnx, "appId", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+ serverCnx, "appId", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null,
MessageId.latest,DEFAULT_CONSUMER_EPOCH);
try {
consumer.flowPermits(permit);
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 570c6cd..ac5230d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -1728,7 +1729,7 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
redeliveryMessagesField.set(dispatcher, redeliveryMessages);
// (a) redelivery with all acked-message should clear messageReply
bucket
-
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
+
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0),
DEFAULT_CONSUMER_EPOCH);
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
return redeliveryMessages.isEmpty();
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 68e0f4f..6d7882f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
@@ -753,7 +754,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1, 0, "Cons1", true, serverCnx,
"myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest,
- new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
consumer.close();
@@ -764,7 +765,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
consumer = new Consumer(sub, subType, topic.getName(), 1, 0,
"Cons1", true, serverCnx, "myrole-1",
Collections.emptyMap(), false, InitialPosition.Latest,
- new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -787,7 +788,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -820,7 +821,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentSubscription sub = new PersistentSubscription(topic,
"non-durable-sub", cursorMock, false);
Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1, 0, "Cons1", true, serverCnx,
- "myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest);
+ "myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
assertFalse(sub.getDispatcher().isClosed());
@@ -858,14 +859,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// 1. add consumer1
Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(),
1 /* consumer id */, 0,
"Cons1"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub, consumer);
assertEquals(sub.getConsumers().size(), 1);
// 2. add consumer2
Consumer consumer2 = new Consumer(sub, SubType.Shared,
topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub, consumer2);
assertEquals(sub.getConsumers().size(), 2);
@@ -873,7 +874,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
try {
Consumer consumer3 = new Consumer(sub, SubType.Shared,
topic.getName(), 3 /* consumer id */, 0,
"Cons3"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic,
sub, consumer3)).get();
fail("should have failed");
} catch (ExecutionException e) {
@@ -886,7 +887,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// 4. add consumer4 to sub2
Consumer consumer4 = new Consumer(sub2, SubType.Shared,
topic.getName(), 4 /* consumer id */, 0,
"Cons4"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub2, consumer4);
assertEquals(sub2.getConsumers().size(), 1);
@@ -897,7 +898,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
try {
Consumer consumer5 = new Consumer(sub2, SubType.Shared,
topic.getName(), 5 /* consumer id */, 0,
"Cons5"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic,
sub2, consumer5)).get();
fail("should have failed");
} catch (ExecutionException e) {
@@ -962,14 +963,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// 1. add consumer1
Consumer consumer = new Consumer(sub, SubType.Failover,
topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub, consumer);
assertEquals(sub.getConsumers().size(), 1);
// 2. add consumer2
Consumer consumer2 = new Consumer(sub, SubType.Failover,
topic.getName(), 2 /* consumer id */, 0,
"Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub, consumer2);
assertEquals(sub.getConsumers().size(), 2);
@@ -977,7 +978,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
try {
Consumer consumer3 = new Consumer(sub, SubType.Failover,
topic.getName(), 3 /* consumer id */, 0,
"Cons3"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic,
sub, consumer3)).get();
fail("should have failed");
} catch (ExecutionException e) {
@@ -990,7 +991,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// 4. add consumer4 to sub2
Consumer consumer4 = new Consumer(sub2, SubType.Failover,
topic.getName(), 4 /* consumer id */, 0,
"Cons4"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, sub2, consumer4);
assertEquals(sub2.getConsumers().size(), 1);
@@ -1001,7 +1002,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
try {
Consumer consumer5 = new Consumer(sub2, SubType.Failover,
topic.getName(), 5 /* consumer id */, 0,
"Cons5"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(),
- false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest);
+ false /* read compacted */, InitialPosition.Latest, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic,
sub2, consumer5)).get();
fail("should have failed");
} catch (ExecutionException e) {
@@ -1055,7 +1056,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(new PulsarCommandSenderImpl(null,
cnx)).when(cnx).getCommandSender();
return new Consumer(sub, SubType.Shared, topic.getName(), consumerId,
0, consumerNameBase + consumerId, true,
- cnx, role, Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest);
+ cnx, role, Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
}
@Test
@@ -1163,7 +1164,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer1);
doAnswer(new Answer<Object>() {
@@ -1187,7 +1188,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets
executed first */
sub.addConsumer(new Consumer(sub, SubType.Exclusive,
topic.getName(), 2 /* consumer id */,
0, "Cons2"/* consumer name */, true, serverCnx,
- "myrole-1", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest)).get();
+ "myrole-1", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH)).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof
BrokerServiceException.SubscriptionFencedException);
@@ -1965,21 +1966,21 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor1 = ledger.openCursor("c1");
PersistentSubscription sub1 = new PersistentSubscription(topic,
"sub-1", cursor1, false);
Consumer consumer1 = new Consumer(sub1, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
sub1.addConsumer(consumer1);
// Open cursor2, add it into activeCursor-container and add it into
subscription consumer list
ManagedCursor cursor2 = ledger.openCursor("c2");
PersistentSubscription sub2 = new PersistentSubscription(topic,
"sub-2", cursor2, false);
Consumer consumer2 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
- true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
sub2.addConsumer(consumer2);
// Open cursor3, add it into activeCursor-container and do not add it
into subscription consumer list
ManagedCursor cursor3 = ledger.openCursor("c3");
PersistentSubscription sub3 = new PersistentSubscription(topic,
"sub-3", cursor3, false);
Consumer consumer3 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
- true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read
compacted */, InitialPosition.Latest, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
// Case1: cursors are active as haven't started
deactivateBacklogCursor scan
@@ -2089,7 +2090,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
addConsumerToSubscription.setAccessible(true);
Consumer consumer = new Consumer(nonDeletableSubscription1,
SubType.Shared, topic.getName(), 1, 0, "consumer1",
- true, serverCnx, "app1", Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest);
+ true, serverCnx, "app1", Collections.emptyMap(), false,
InitialPosition.Latest, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, nonDeletableSubscription1,
consumer);
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
@@ -2212,7 +2213,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Consumer consumer1 = new Consumer(sub1, SubType.Key_Shared,
topic.getName(), 1, 0, "Cons1", true, serverCnx,
"myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest,
new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(false),
- MessageId.latest);
+ MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub1.addConsumer(consumer1);
consumer1.close();
@@ -2223,7 +2224,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared,
topic.getName(), 2, 0, "Cons2", true, serverCnx,
"myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest,
new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT).setAllowOutOfOrderDelivery(true),
- MessageId.latest);
+ MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub2.addConsumer(consumer2);
consumer2.close();
@@ -2235,7 +2236,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.setAllowOutOfOrderDelivery(false);
ksm.addHashRange().setStart(0).setEnd(65535);
Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared,
topic.getName(), 3, 0, "Cons3", true, serverCnx,
- "myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest, ksm, MessageId.latest);
+ "myrole-1", Collections.emptyMap(), false,
InitialPosition.Latest, ksm, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub3.addConsumer(consumer3);
consumer3.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 2358e01..f3fd353c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -18,11 +18,10 @@
*/
package org.apache.pulsar.client.impl;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.testng.Assert.assertEquals;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -82,7 +81,7 @@ public class CompactedOutBatchMessageTest extends
ProducerConsumerBase {
.subscriptionName("my-subscriber-name").subscribe()) {
// shove it in the sideways
consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata,
metadata, 0, null,
- batchBuffer, new
MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx());
+ batchBuffer, new
MessageIdData().setLedgerId(1234).setEntryId(567), consumer.cnx(),
DEFAULT_CONSUMER_EPOCH);
Message<?> m = consumer.receive();
assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(),
1234);
assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(),
567);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
index 8b430e7..b7ff6b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
@@ -22,7 +22,7 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-
+import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -30,10 +30,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -41,16 +41,16 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
import com.google.common.collect.Sets;
-
import io.netty.util.concurrent.DefaultThreadFactory;
@Test(groups = "broker-impl")
@@ -257,4 +257,129 @@ public class MessageRedeliveryTest extends
ProducerConsumerBase {
assertNull(message);
}
+ @Test(dataProvider = "enableBatch")
+ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
+ final String topic = "testRedeliveryAddEpoch";
+ final String subName = "my-sub";
+ ConsumerBase<String> consumer = ((ConsumerBase<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe());
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .create();
+
+ String test1 = "Pulsar1";
+ String test2 = "Pulsar2";
+ String test3 = "Pulsar3";
+ producer.send(test1);
+
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopics()
+ .get(TopicName.get("persistent://public/default/" +
topic).toString()).get().get();
+ PersistentDispatcherSingleActiveConsumer
persistentDispatcherSingleActiveConsumer =
+ (PersistentDispatcherSingleActiveConsumer)
persistentTopic.getSubscription(subName).getDispatcher();
+
+ consumer.setConsumerEpoch(1);
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNull(message);
+ consumer.redeliverUnacknowledgedMessages();
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(message);
+ consumer.acknowledgeCumulativeAsync(message).get();
+ assertEquals(message.getValue(), test1);
+
+ consumer.setConsumerEpoch(3);
+
+ producer.send(test2);
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNull(message);
+
+ consumer.redeliverUnacknowledgedMessages();
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ consumer.acknowledgeCumulativeAsync(message).get();
+ consumer.redeliverUnacknowledgedMessages();
+ assertNotNull(message);
+ assertEquals(message.getValue(), test2);
+
+ consumer.setConsumerEpoch(6);
+ producer.send(test3);
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNull(message);
+
+ Field field =
consumer.getClass().getDeclaredField("connectionHandler");
+ field.setAccessible(true);
+ ConnectionHandler connectionHandler = (ConnectionHandler)
field.get(consumer);
+
+ field =
connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
+ field.setAccessible(true);
+
+ connectionHandler.cnx().channel().close();
+
+ ((ConsumerImpl<String>) consumer).grabCnx();
+ message = consumer.receive(3, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(message.getValue(), test3);
+ }
+
+ @DataProvider(name = "enableBatch")
+ public static Object[][] enableBatch() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+
+ @Test(dataProvider = "enableBatch")
+ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch)
throws Exception{
+ final String topic = "testMultiConsumerRedeliveryAddEpoch";
+ final String subName = "my-sub";
+ admin.topics().createPartitionedTopic(topic, 5);
+ final int messageNumber = 50;
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .create();
+
+ for (int i = 0; i < messageNumber; i++) {
+ producer.send("" + i);
+ }
+
+ for (int i = 0; i < messageNumber; i++) {
+ consumer.receive();
+ }
+
+ // redeliverUnacknowledgedMessages once
+ consumer.redeliverUnacknowledgedMessages();
+
+ Message<String> message;
+ for (int i = 0; i < messageNumber; i++) {
+ message = consumer.receive();
+ // message consumer epoch is 1
+ assertEquals((((MessageImpl)((TopicMessageImpl)
message).getMessage())).getConsumerEpoch(), 1);
+ }
+
+ // can't receive message again
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ assertNull(message);
+
+ // redeliverUnacknowledgedMessages twice
+ consumer.redeliverUnacknowledgedMessages();
+
+ for (int i = 0; i < messageNumber; i++) {
+ message = consumer.receive();
+ // message consumer epoch is 2
+ assertEquals((((MessageImpl)((TopicMessageImpl)
message).getMessage())).getConsumerEpoch(), 2);
+ }
+
+ // can't receive message again
+ message = consumer.receive(5, TimeUnit.SECONDS);
+ assertNull(message);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 934cff6..e7df294 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,7 +37,6 @@ import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
@@ -448,15 +447,7 @@ public class ClientCnx extends PulsarHandler {
}
ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
if (consumer != null) {
- List<Long> ackSets = Collections.emptyList();
- if (cmdMessage.getAckSetsCount() > 0) {
- ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
- for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
- ackSets.add(cmdMessage.getAckSetAt(i));
- }
- }
- consumer.messageReceived(cmdMessage.getMessageId(),
cmdMessage.getRedeliveryCount(), ackSets,
- headersAndPayload, this);
+ consumer.messageReceived(cmdMessage, headersAndPayload, this);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 242bb9f..7f0575a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -53,6 +56,7 @@ import
org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -86,6 +90,13 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final Lock reentrantLock = new ReentrantLock();
private volatile boolean isListenerHandlingMessage = false;
+ protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH
=
+ AtomicLongFieldUpdater.newUpdater(ConsumerBase.class,
"consumerEpoch");
+
+ @Setter
+ @Getter
+ protected volatile long consumerEpoch;
+
protected ConsumerBase(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider
executorProvider,
CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema,
@@ -1054,5 +1065,22 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
return executor;
}
+ // If message consumer epoch is smaller than consumer epoch present that
+ // it has been sent to the client before the user calls
redeliverUnacknowledgedMessages, this message is invalid.
+ // so we should release this message and receive again
+ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
+ if ((getSubType() == CommandSubscribe.SubType.Failover
+ || getSubType() == CommandSubscribe.SubType.Exclusive)
+ && message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
+ && message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
+ log.warn("Consumer filter old epoch message, topic : [{}],
messageId : [{}], consumerEpoch : [{}]",
+ topic, message.getMessageId(), consumerEpoch);
+ message.release();
+ message.recycle();
+ return false;
+ }
+ return true;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ConsumerBase.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 60f9ca7..e09a934 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.collect.ComparisonChain;
@@ -86,6 +87,7 @@ import
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
@@ -196,7 +198,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
-
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T>
conf,
@@ -419,6 +420,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
try {
message = incomingMessages.take();
messageProcessed(message);
+ if (!isValidConsumerEpoch(message)) {
+ return internalReceive();
+ }
return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
@@ -426,6 +430,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ private boolean isValidConsumerEpoch(Message<T> message) {
+ return isValidConsumerEpoch((MessageImpl<T>) message);
+ }
+
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
@@ -437,6 +445,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
} else {
messageProcessed(message);
+ if (!isValidConsumerEpoch(message)) {
+ pendingReceives.add(result);
+ cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
+ return;
+ }
result.complete(beforeConsume(message));
}
});
@@ -447,12 +460,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
Message<T> message;
+ long callTime = System.nanoTime();
try {
message = incomingMessages.poll(timeout, unit);
if (message == null) {
return null;
}
messageProcessed(message);
+ if (!isValidConsumerEpoch(message)) {
+ long executionTime = System.nanoTime() - callTime;
+ if (executionTime >= unit.toNanos(timeout)) {
+ return null;
+ } else {
+ return internalReceive((int) (timeout - executionTime),
TimeUnit.NANOSECONDS);
+ }
+ }
return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
@@ -492,6 +514,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
+ if (!isValidConsumerEpoch(msg)) {
+ msgPeeked = incomingMessages.peek();
+ continue;
+ }
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
@@ -736,7 +762,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
clearReceiverQueue();
return;
}
- setClientCnx(cnx);
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
topic, subscription, cnx.ctx().channel(), consumerId);
@@ -747,8 +772,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
SUBSCRIBE_DEADLINE_UPDATER
- .compareAndSet(this, 0L,
- System.currentTimeMillis() +
client.getConfiguration().getOperationTimeoutMs());
+ .compareAndSet(this, 0L, System.currentTimeMillis()
+ + client.getConfiguration().getOperationTimeoutMs());
int currentSize;
synchronized (this) {
@@ -780,77 +805,86 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
// startMessageRollbackDurationInSec should be consider only once when
consumer connects to first time
long startMessageRollbackDuration = (startMessageRollbackDurationInSec
> 0
- && startMessageId != null &&
startMessageId.equals(initialStartMessageId))
- ? startMessageRollbackDurationInSec : 0;
- ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(), priorityLevel,
- consumerName, isDurable, startMessageIdData, metadata,
readCompacted,
- conf.isReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
- startMessageRollbackDuration, si, createTopicIfDoesNotExist,
conf.getKeySharedPolicy(),
- conf.getSubscriptionProperties());
-
- cnx.sendRequestWithId(request, requestId).thenRun(() -> {
- synchronized (ConsumerImpl.this) {
- if (changeToReadyState()) {
- consumerIsReconnectedToBroker(cnx, currentSize);
- } else {
+ && startMessageId != null
+ && startMessageId.equals(initialStartMessageId)) ?
startMessageRollbackDurationInSec : 0;
+
+ // synchronized this, because redeliverUnAckMessage eliminate the
epoch inconsistency between them
+ synchronized (this) {
+ setClientCnx(cnx);
+ ByteBuf request = Commands.newSubscribe(topic, subscription,
consumerId, requestId, getSubType(),
+ priorityLevel, consumerName, isDurable,
startMessageIdData, metadata, readCompacted,
+ conf.isReplicateSubscriptionState(),
+
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+ startMessageRollbackDuration, si,
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
+ // Use the current epoch to subscribe.
+ conf.getSubscriptionProperties(),
CONSUMER_EPOCH.get(this));
+
+ cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+ synchronized (ConsumerImpl.this) {
+ if (changeToReadyState()) {
+ consumerIsReconnectedToBroker(cnx, currentSize);
+ } else {
+ // Consumer was closed while reconnecting, close the
connection to make sure the broker
+ // drops the consumer on its side
+ setState(State.Closed);
+ deregisterFromClientCnx();
+ client.cleanupConsumer(this);
+ cnx.channel().close();
+ return;
+ }
+ }
+
+ resetBackoff();
+
+ boolean firstTimeConnect = subscribeFuture.complete(this);
+ // if the consumer is not partitioned or is re-connected and
is partitioned, we send the flow
+ // command to receive messages.
+ if (!(firstTimeConnect && hasParentConsumer) &&
conf.getReceiverQueueSize() != 0) {
+ increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
+ }
+ }).exceptionally((e) -> {
+ deregisterFromClientCnx();
+ if (getState() == State.Closing || getState() == State.Closed)
{
// Consumer was closed while reconnecting, close the
connection to make sure the broker
// drops the consumer on its side
- setState(State.Closed);
- deregisterFromClientCnx();
- client.cleanupConsumer(this);
cnx.channel().close();
- return;
+ return null;
+ }
+ log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
+ subscription, cnx.channel().remoteAddress());
+
+ if (e.getCause() instanceof PulsarClientException
+ && PulsarClientException.isRetriableError(e.getCause())
+ && System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
+ reconnectLater(e.getCause());
+ } else if (!subscribeFuture.isDone()) {
+ // unable to create new consumer, fail operation
+ setState(State.Failed);
+ closeConsumerTasks();
+ subscribeFuture.completeExceptionally(
+ PulsarClientException.wrap(e,
String.format("Failed to subscribe the topic %s "
+ + "with subscription name %s when
connecting to the broker",
+ topicName.toString(), subscription)));
+ client.cleanupConsumer(this);
+ } else if (e.getCause() instanceof TopicDoesNotExistException)
{
+ // The topic was deleted after the consumer was created,
and we're
+ // not allowed to recreate the topic. This can happen in
few cases:
+ // * Regex consumer getting error after topic gets deleted
+ // * Regular consumer after topic is manually delete and
with
+ // auto-topic-creation set to false
+ // No more retries are needed in this case.
+ setState(State.Failed);
+ closeConsumerTasks();
+ client.cleanupConsumer(this);
+ log.warn("[{}][{}] Closed consumer because topic does not
exist anymore {}",
+ topic, subscription,
cnx.channel().remoteAddress());
+ } else {
+ // consumer was subscribed and connected but we got some
error, keep trying
+ reconnectLater(e.getCause());
}
- }
-
- resetBackoff();
-
- boolean firstTimeConnect = subscribeFuture.complete(this);
- // if the consumer is not partitioned or is re-connected and is
partitioned, we send the flow
- // command to receive messages.
- if (!(firstTimeConnect && hasParentConsumer) &&
conf.getReceiverQueueSize() != 0) {
- increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
- }
- }).exceptionally((e) -> {
- deregisterFromClientCnx();
- if (getState() == State.Closing || getState() == State.Closed) {
- // Consumer was closed while reconnecting, close the
connection to make sure the broker
- // drops the consumer on its side
- cnx.channel().close();
return null;
- }
- log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
subscription, cnx.channel().remoteAddress());
-
- if (e.getCause() instanceof PulsarClientException
- && PulsarClientException.isRetriableError(e.getCause())
- && System.currentTimeMillis() <
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
- reconnectLater(e.getCause());
- } else if (!subscribeFuture.isDone()) {
- // unable to create new consumer, fail operation
- setState(State.Failed);
- closeConsumerTasks();
- subscribeFuture.completeExceptionally(
- PulsarClientException.wrap(e, String.format("Failed to
subscribe the topic %s with subscription "
- + "name %s when connecting to the broker",
topicName.toString(), subscription)));
- client.cleanupConsumer(this);
- } else if (e.getCause() instanceof TopicDoesNotExistException) {
- // The topic was deleted after the consumer was created, and
we're
- // not allowed to recreate the topic. This can happen in few
cases:
- // * Regex consumer getting error after topic gets deleted
- // * Regular consumer after topic is manually delete and with
- // auto-topic-creation set to false
- // No more retries are needed in this case.
- setState(State.Failed);
- closeConsumerTasks();
- client.cleanupConsumer(this);
- log.warn("[{}][{}] Closed consumer because topic does not
exist anymore {}",
- topic, subscription, cnx.channel().remoteAddress());
- } else {
- // consumer was subscribed and connected but we got some
error, keep trying
- reconnectLater(e.getCause());
- }
- return null;
- });
+ });
+ }
}
protected void consumerIsReconnectedToBroker(ClientCnx cnx, int
currentQueueSize) {
@@ -1057,7 +1091,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final boolean
containMetadata,
final BitSetRecyclable
ackBitSet,
final BatchMessageAcker
acker,
- final int redeliveryCount) {
+ final int redeliveryCount,
+ final long consumerEpoch) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] processing message num - {} in batch",
subscription, consumerName, index);
}
@@ -1095,8 +1130,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final ByteBuf payloadBuffer = (singleMessagePayload != null) ?
singleMessagePayload : payload;
final MessageImpl<V> message =
MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, payloadBuffer,
- createEncryptionContext(msgMetadata), cnx(), schema,
redeliveryCount, poolMessages
- );
+ createEncryptionContext(msgMetadata), cnx(), schema,
redeliveryCount, poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
} catch (IOException | IllegalStateException e) {
@@ -1113,10 +1147,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final MessageMetadata
messageMetadata,
final ByteBuf payload,
final Schema<V> schema,
- final int redeliveryCount) {
+ final int redeliveryCount,
+ final long consumerEpoch) {
final MessageImpl<V> message =
MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
- createEncryptionContext(messageMetadata), cnx(), schema,
redeliveryCount, poolMessages
- );
+ createEncryptionContext(messageMetadata), cnx(), schema,
redeliveryCount, poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
}
@@ -1140,10 +1174,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final MessageIdImpl messageId,
final Schema<T> schema,
final int redeliveryCount,
- final List<Long> ackSet) {
+ final List<Long> ackSet,
+ long consumerEpoch) {
final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
final MessagePayloadContextImpl entryContext =
MessagePayloadContextImpl.get(
- brokerEntryMetadata, messageMetadata, messageId, this,
redeliveryCount, ackSet);
+ brokerEntryMetadata, messageMetadata, messageId, this,
redeliveryCount, ackSet, consumerEpoch);
final AtomicInteger skippedMessages = new AtomicInteger(0);
try {
conf.getPayloadProcessor().process(payload, entryContext, schema,
message -> {
@@ -1168,8 +1203,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
tryTriggerListener();
}
- void messageReceived(MessageIdData messageId, int redeliveryCount,
List<Long> ackSet, ByteBuf headersAndPayload,
- ClientCnx cnx) {
+ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload,
ClientCnx cnx) {
+ List<Long> ackSet = Collections.emptyList();
+ if (cmdMessage.getAckSetsCount() > 0) {
+ ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
+ for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
+ ackSet.add(cmdMessage.getAckSetAt(i));
+ }
+ }
+ int redeliveryCount = cmdMessage.getRedeliveryCount();
+ MessageIdData messageId = cmdMessage.getMessageId();
+ long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+ // if broker send messages to client with consumerEpoch, we should set
consumerEpoch to message
+ if (cmdMessage.hasConsumerEpoch()) {
+ consumerEpoch = cmdMessage.getConsumerEpoch();
+ }
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription,
messageId.getLedgerId(),
messageId.getEntryId());
@@ -1227,8 +1275,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (conf.getPayloadProcessor() != null) {
// uncompressedPayload is released in this method so we don't need
to call release() again
- processPayloadByProcessor(
- brokerEntryMetadata, msgMetadata, uncompressedPayload,
msgId, schema, redeliveryCount, ackSet);
+ processPayloadByProcessor(brokerEntryMetadata, msgMetadata,
+ uncompressedPayload, msgId, schema, redeliveryCount,
ackSet, consumerEpoch);
return;
}
@@ -1275,7 +1323,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
final MessageImpl<T> message =
- newMessage(msgId, brokerEntryMetadata, msgMetadata,
uncompressedPayload, schema, redeliveryCount);
+ newMessage(msgId, brokerEntryMetadata, msgMetadata,
uncompressedPayload,
+ schema, redeliveryCount, consumerEpoch);
uncompressedPayload.release();
if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null
@@ -1287,7 +1336,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else {
// handle batch message enqueuing; uncompressed payload has all
messages in batch
receiveIndividualMessagesFromBatch(brokerEntryMetadata,
msgMetadata, redeliveryCount, ackSet,
- uncompressedPayload, messageId, cnx);
+ uncompressedPayload, messageId, cnx, consumerEpoch);
uncompressedPayload.release();
}
@@ -1422,7 +1471,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata
brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long>
ackSet, ByteBuf uncompressedPayload,
- MessageIdData messageId, ClientCnx
cnx) {
+ MessageIdData messageId, ClientCnx
cnx, long consumerEpoch) {
int batchSize = msgMetadata.getNumMessagesInBatch();
// create ack tracker for entry aka batch
@@ -1444,8 +1493,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
try {
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize,
brokerEntryMetadata, msgMetadata,
- singleMessageMetadata, uncompressedPayload,
batchMessage, schema, true, ackBitSet, acker,
- redeliveryCount);
+ singleMessageMetadata, uncompressedPayload,
batchMessage, schema, true,
+ ackBitSet, acker, redeliveryCount, consumerEpoch);
if (message == null) {
skippedMessages++;
continue;
@@ -1723,6 +1772,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return getClientCnx() != null && (getState() == State.Ready);
}
+ public boolean isConnected(ClientCnx cnx) {
+ return cnx != null && (getState() == State.Ready);
+ }
+
int getPartitionIndex() {
return partitionIndex;
}
@@ -1739,29 +1792,54 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
public void redeliverUnacknowledgedMessages() {
- ClientCnx cnx = cnx();
- if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v2.getValue()) {
- int currentSize = 0;
- synchronized (this) {
- currentSize = incomingMessages.size();
- clearIncomingMessages();
- unAckedMessageTracker.clear();
- }
-
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
cnx.ctx().voidPromise());
- if (currentSize > 0) {
- increaseAvailablePermits(cnx, currentSize);
+ // First : synchronized in order to handle consumer reconnect produce
race condition, when broker receive
+ // redeliverUnacknowledgedMessages and consumer have not be created and
+ // then receive reconnect epoch change the broker is smaller than the
client epoch, this will cause client epoch
+ // smaller than broker epoch forever. client will not receive message
anymore.
+ // Second : we should synchronized `ClientCnx cnx = cnx()` to
+ // prevent use old cnx to send redeliverUnacknowledgedMessages to a
old broker
+ synchronized (ConsumerImpl.this) {
+ ClientCnx cnx = cnx();
+ // V1 don't support redeliverUnacknowledgedMessages
+ if (cnx != null && cnx.getRemoteEndpointProtocolVersion() <
ProtocolVersion.v2.getValue()) {
+ if ((getState() == State.Connecting)) {
+ log.warn("[{}] Client Connection needs to be established "
+ + "for redelivery of unacknowledged messages",
this);
+ } else {
+ log.warn("[{}] Reconnecting the client to redeliver the
messages.", this);
+ cnx.ctx().close();
+ }
+
+ return;
}
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] [{}] Redeliver unacked messages and send
{} permits", subscription, topic,
- consumerName, currentSize);
+
+ // clear local message
+ int currentSize = 0;
+ currentSize = incomingMessages.size();
+ clearIncomingMessages();
+ unAckedMessageTracker.clear();
+
+ // we should increase epoch every time, because
MultiTopicsConsumerImpl also increase it,
+ // we need to keep both epochs the same
+ if (conf.getSubscriptionType() == SubscriptionType.Failover
+ || conf.getSubscriptionType() ==
SubscriptionType.Exclusive) {
+ CONSUMER_EPOCH.incrementAndGet(this);
+ }
+ // is channel is connected, we should send redeliver command to
broker
+ if (cnx != null && isConnected(cnx)) {
+
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
+ consumerId, CONSUMER_EPOCH.get(this)),
cnx.ctx().voidPromise());
+ if (currentSize > 0) {
+ increaseAvailablePermits(cnx, currentSize);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] [{}] Redeliver unacked messages and
send {} permits", subscription, topic,
+ consumerName, currentSize);
+ }
+ } else {
+ log.warn("[{}] Send redeliver messages command but the client
is reconnect or close, "
+ + "so don't need to send redeliver command to broker",
this);
}
- return;
- }
- if (cnx == null || (getState() == State.Connecting)) {
- log.warn("[{}] Client Connection needs to be established for
redelivery of unacknowledged messages", this);
- } else {
- log.warn("[{}] Reconnecting the client to redeliver the
messages.", this);
- cnx.ctx().close();
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 7fcc720..a184bd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -35,6 +36,7 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
@@ -72,7 +74,8 @@ public class MessageImpl<T> implements Message<T> {
private BrokerEntryMetadata brokerEntryMetadata;
private boolean poolMessage;
-
+ @Getter
+ private long consumerEpoch;
// Constructor for out-going message
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata,
ByteBuffer payload, Schema<T> schema,
String topic) {
@@ -98,75 +101,77 @@ public class MessageImpl<T> implements Message<T> {
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
- this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0, false);
+ this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0, false, DEFAULT_CONSUMER_EPOCH);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
- boolean pooledMessage) {
+ boolean pooledMessage, long consumerEpoch) {
this.msgMetadata = new MessageMetadata();
- init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, redeliveryCount, pooledMessage);
+ init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
+ schema, redeliveryCount, pooledMessage, consumerEpoch);
}
public static <T> MessageImpl<T> create(String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
- int redeliveryCount, boolean pooledMessage) {
+ int redeliveryCount, boolean pooledMessage, long consumerEpoch) {
if (pooledMessage) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
init(msg, topic, messageId, msgMetadata, payload, encryptionCtx,
cnx, schema, redeliveryCount,
- pooledMessage);
+ pooledMessage, consumerEpoch);
return msg;
} else {
return new MessageImpl<>(topic, messageId, msgMetadata, payload,
encryptionCtx, cnx, schema,
- redeliveryCount, pooledMessage);
+ redeliveryCount, pooledMessage, consumerEpoch);
}
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata msgMetadata,
SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx,
- ClientCnx cnx, Schema<T> schema) {
+ ClientCnx cnx, Schema<T> schema, long consumerEpoch) {
this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata,
payload, encryptionCtx, cnx, schema, 0,
- false);
+ false, consumerEpoch);
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata batchMetadata,
SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx,
- ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean
keepMessageInDirectMemory) {
+ ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+ boolean keepMessageInDirectMemory, long consumerEpoch) {
this.msgMetadata = new MessageMetadata();
- init(this, topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload, encryptionCtx, cnx, schema,
- redeliveryCount, keepMessageInDirectMemory);
+ init(this, topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload, encryptionCtx,
+ cnx, schema, redeliveryCount, keepMessageInDirectMemory,
consumerEpoch);
}
public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl
batchMessageIdImpl,
MessageMetadata batchMetadata, SingleMessageMetadata
singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
- boolean pooledMessage) {
+ boolean pooledMessage, long consumerEpoch) {
if (pooledMessage) {
@SuppressWarnings("unchecked")
MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
init(msg, topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload, encryptionCtx, cnx,
- schema, redeliveryCount, pooledMessage);
+ schema, redeliveryCount, pooledMessage, consumerEpoch);
return msg;
} else {
return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload,
- encryptionCtx, cnx, schema, redeliveryCount,
pooledMessage);
+ encryptionCtx, cnx, schema, redeliveryCount,
pooledMessage, consumerEpoch);
}
}
static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
- int redeliveryCount, boolean poolMessage) {
+ int redeliveryCount, boolean poolMessage, long consumerEpoch) {
init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /*
singleMessageMetadata */, payload,
- encryptionCtx, cnx, schema, redeliveryCount, poolMessage);
+ encryptionCtx, cnx, schema, redeliveryCount, poolMessage,
consumerEpoch);
msg.messageId = messageId;
}
private static <T> void init(MessageImpl<T> msg, String topic,
BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata msgMetadata, SingleMessageMetadata
singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
- boolean poolMessage) {
+ boolean poolMessage, long consumerEpoch) {
msg.msgMetadata.clear();
msg.msgMetadata.copyFrom(msgMetadata);
msg.messageId = batchMessageIdImpl;
@@ -175,6 +180,7 @@ public class MessageImpl<T> implements Message<T> {
msg.redeliveryCount = redeliveryCount;
msg.encryptionCtx = encryptionCtx;
msg.schema = schema;
+ msg.consumerEpoch = consumerEpoch;
msg.poolMessage = poolMessage;
// If it's not pool message then need to make a copy since the passed
payload is
@@ -289,6 +295,7 @@ public class MessageImpl<T> implements Message<T> {
msg.topic = null;
msg.cnx = null;
msg.brokerEntryMetadata = brokerEntryMetadata;
+ msg.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
return msg;
}
@@ -636,6 +643,7 @@ public class MessageImpl<T> implements Message<T> {
schema = null;
schemaState = SchemaState.None;
poolMessage = false;
+ consumerEpoch = DEFAULT_CONSUMER_EPOCH;
if (recyclerHandle != null) {
recyclerHandle.recycle(this);
@@ -686,6 +694,7 @@ public class MessageImpl<T> implements Message<T> {
this.redeliveryCount = 0;
this.msgMetadata = new MessageMetadata();
this.brokerEntryMetadata = new BrokerEntryMetadata();
+ this.consumerEpoch = DEFAULT_CONSUMER_EPOCH;
}
private Handle<MessageImpl<?>> recyclerHandle;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
index 443c79b..c4eb264 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.List;
@@ -51,6 +52,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
private int redeliveryCount;
private BatchMessageAcker acker;
private BitSetRecyclable ackBitSet;
+ private long consumerEpoch;
private MessagePayloadContextImpl(final
Recycler.Handle<MessagePayloadContextImpl> handle) {
this.recyclerHandle = handle;
@@ -61,8 +63,10 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
@NonNull final MessageIdImpl
messageId,
@NonNull final ConsumerImpl<?>
consumer,
final int redeliveryCount,
- final List<Long> ackSet) {
+ final List<Long> ackSet,
+ final long consumerEpoch) {
final MessagePayloadContextImpl context = RECYCLER.get();
+ context.consumerEpoch = consumerEpoch;
context.brokerEntryMetadata = brokerEntryMetadata;
context.messageMetadata = messageMetadata;
context.singleMessageMetadata = new SingleMessageMetadata();
@@ -83,6 +87,7 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
messageId = null;
consumer = null;
redeliveryCount = 0;
+ consumerEpoch = DEFAULT_CONSUMER_EPOCH;
acker = null;
if (ackBitSet != null) {
ackBitSet.recycle();
@@ -130,7 +135,8 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
containMetadata,
ackBitSet,
acker,
- redeliveryCount);
+ redeliveryCount,
+ consumerEpoch);
} finally {
payloadBuffer.release();
}
@@ -140,8 +146,8 @@ public class MessagePayloadContextImpl implements
MessagePayloadContext {
public <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T>
schema) {
final ByteBuf payloadBuffer =
MessagePayloadUtils.convertToByteBuf(payload);
try {
- return consumer.newMessage(
- messageId, brokerEntryMetadata, messageMetadata,
payloadBuffer, schema, redeliveryCount);
+ return consumer.newMessage(messageId, brokerEntryMetadata,
+ messageMetadata, payloadBuffer, schema, redeliveryCount,
consumerEpoch);
} finally {
payloadBuffer.release();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a3e85f2..dc58ec4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -106,7 +106,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private volatile BatchMessageIdImpl startMessageId = null;
private final long startMessageRollbackDurationInSec;
-
MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>>
subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean
createTopicIfDoesNotExist) {
@@ -334,6 +333,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
}
+ // If message consumer epoch is smaller than consumer epoch present that
+ // it has been sent to the client before the user calls
redeliverUnacknowledgedMessages, this message is invalid.
+ // so we should release this message and receive again
+ private boolean isValidConsumerEpoch(Message<T> message) {
+ return isValidConsumerEpoch(((MessageImpl<T>) (((TopicMessageImpl<T>)
message))
+ .getMessage()));
+ }
+
@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
@@ -341,6 +348,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
message = incomingMessages.take();
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
+ if (!isValidConsumerEpoch(message)) {
+ resumeReceivingFromPausedConsumersIfNeeded();
+ message.release();
+ return internalReceive();
+ }
unAckedMessageTracker.add(message.getMessageId(),
message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
@@ -352,11 +364,22 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
Message<T> message;
+
+ long callTime = System.nanoTime();
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
+ if (!isValidConsumerEpoch(message)) {
+ long executionTime = System.nanoTime() - callTime;
+ if (executionTime >= unit.toNanos(timeout)) {
+ return null;
+ } else {
+ resumeReceivingFromPausedConsumersIfNeeded();
+ return internalReceive((int) (timeout -
executionTime), TimeUnit.NANOSECONDS);
+ }
+ }
unAckedMessageTracker.add(message.getMessageId(),
message.getRedeliveryCount());
}
resumeReceivingFromPausedConsumersIfNeeded();
@@ -394,6 +417,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
decreaseIncomingMessageSize(msg);
+ if (!isValidConsumerEpoch(msg)) {
+ msgPeeked = incomingMessages.peek();
+ continue;
+ }
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
@@ -653,6 +680,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
public void redeliverUnacknowledgedMessages() {
lock.writeLock().lock();
try {
+ CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index eb84faa..fe6b929 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -177,7 +177,7 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
@Override
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata
brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long>
ackSet, ByteBuf uncompressedPayload,
- MessageIdData messageId, ClientCnx
cnx) {
+ MessageIdData messageId, ClientCnx
cnx, long consumerEpoch) {
log.warn(
"Closing consumer [{}]-[{}] due to unsupported received
batch-message with zero receiver queue size",
subscription, consumerName);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 12bfba1..95b21fd 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -115,6 +115,10 @@ public class Commands {
public static final int MESSAGE_SIZE_FRAME_PADDING = 10 * 1024;
public static final int INVALID_MAX_MESSAGE_SIZE = -1;
+ // this present broker version don't have consumerEpoch feature,
+ // so client don't need to think about consumerEpoch feature
+ public static final long DEFAULT_CONSUMER_EPOCH = -1L;
+
@SuppressWarnings("checkstyle:ConstantName")
public static final short magicCrc32c = 0x0e01;
@SuppressWarnings("checkstyle:ConstantName")
@@ -459,7 +463,7 @@ public class Commands {
}
public static BaseCommand newMessageCommand(long consumerId, long
ledgerId, long entryId, int partition,
- int redeliveryCount, long[] ackSet) {
+ int redeliveryCount, long[] ackSet, long consumerEpoch) {
BaseCommand cmd = localCmd(Type.MESSAGE);
CommandMessage msg = cmd.setMessage()
.setConsumerId(consumerId);
@@ -467,6 +471,11 @@ public class Commands {
.setLedgerId(ledgerId)
.setEntryId(entryId)
.setPartition(partition);
+
+ // consumerEpoch > -1 is useful
+ if (consumerEpoch > DEFAULT_CONSUMER_EPOCH) {
+ msg.setConsumerEpoch(consumerEpoch);
+ }
if (redeliveryCount > 0) {
msg.setRedeliveryCount(redeliveryCount);
}
@@ -481,8 +490,8 @@ public class Commands {
public static ByteBufPair newMessage(long consumerId, long ledgerId, long
entryId, int partition,
int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet) {
return serializeCommandMessageWithSize(
- newMessageCommand(consumerId, ledgerId, entryId, partition,
redeliveryCount, ackSet),
- metadataAndPayload);
+ newMessageCommand(consumerId, ledgerId, entryId, partition,
redeliveryCount, ackSet,
+ DEFAULT_CONSUMER_EPOCH), metadataAndPayload);
}
public static ByteBufPair newSend(long producerId, long sequenceId, int
numMessaegs, ChecksumType checksumType,
@@ -547,7 +556,7 @@ public class Commands {
return newSubscribe(topic, subscription, consumerId, requestId,
subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted,
isReplicated, subscriptionInitialPosition,
startMessageRollbackDurationInSec, schemaInfo,
createTopicIfDoesNotExist, null,
- Collections.emptyMap());
+ Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH);
}
public static ByteBuf newSubscribe(String topic, String subscription, long
consumerId, long requestId,
@@ -555,7 +564,7 @@ public class Commands {
Map<String, String> metadata, boolean readCompacted, boolean
isReplicated,
InitialPosition subscriptionInitialPosition, long
startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist,
KeySharedPolicy keySharedPolicy,
- Map<String, String> subscriptionProperties) {
+ Map<String, String> subscriptionProperties, long consumerEpoch)
{
BaseCommand cmd = localCmd(Type.SUBSCRIBE);
CommandSubscribe subscribe = cmd.setSubscribe()
.setTopic(topic)
@@ -569,7 +578,8 @@ public class Commands {
.setReadCompacted(readCompacted)
.setInitialPosition(subscriptionInitialPosition)
.setReplicateSubscriptionState(isReplicated)
- .setForceTopicCreation(createTopicIfDoesNotExist);
+ .setForceTopicCreation(createTopicIfDoesNotExist)
+ .setConsumerEpoch(consumerEpoch);
if (subscriptionProperties != null &&
!subscriptionProperties.isEmpty()) {
List<KeyValue> keyValues = new ArrayList<>();
@@ -1033,10 +1043,11 @@ public class Commands {
return serializeWithSize(cmd);
}
- public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId) {
+ public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId,
long consumerEpoch) {
BaseCommand cmd = localCmd(Type.REDELIVER_UNACKNOWLEDGED_MESSAGES);
cmd.setRedeliverUnacknowledgedMessages()
- .setConsumerId(consumerId);
+ .setConsumerId(consumerId)
+ .setConsumerEpoch(consumerEpoch);
return serializeWithSize(cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index b0e343c..fcb516c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -388,6 +388,9 @@ message CommandSubscribe {
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
+
+ // The consumer epoch, when exclusive and failover consumer redeliver
unack message will increase the epoch
+ optional uint64 consumer_epoch = 19;
}
message CommandPartitionedTopicMetadata {
@@ -531,6 +534,7 @@ message CommandMessage {
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4;
+ optional uint64 consumer_epoch = 5;
}
message CommandAck {
@@ -621,6 +625,7 @@ message CommandCloseConsumer {
message CommandRedeliverUnacknowledgedMessages {
required uint64 consumer_id = 1;
repeated MessageIdData message_ids = 2;
+ optional uint64 consumer_epoch = 3;
}
message CommandSuccess {