codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780077192
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -546,15 +547,15 @@ public static ByteBuf newSubscribe(String topic, String
subscription, long consu
return newSubscribe(topic, subscription, consumerId, requestId,
subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted,
isReplicated, subscriptionInitialPosition,
startMessageRollbackDurationInSec, schemaInfo,
createTopicIfDoesNotExist, null,
- Collections.emptyMap());
+ Collections.emptyMap(), -1L);
}
public static ByteBuf newSubscribe(String topic, String subscription, long
consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName,
boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean
isReplicated,
InitialPosition subscriptionInitialPosition, long
startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist,
KeySharedPolicy keySharedPolicy,
- Map<String, String> subscriptionProperties) {
+ Map<String, String> subscriptionProperties, long epoch) {
Review comment:
```suggestion
Map<String, String> subscriptionProperties, long
consumerEpoch) {
```
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4;
+ optional uint64 consumer_epoch = 5 [default = 0];
Review comment:
I think we don't need the default value?
If the `consumer_epoch` does not present, it means the broker does not
assign a `consumer_epoch` for the message or the message from the old version
broker without the consumer epoch feature.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -94,9 +96,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
+
+ // TODO: redeliver epoch
Review comment:
Could you please create an issue for tracking the task?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -937,10 +937,13 @@ public void testPooledMessageWithAckTimeout(boolean
isBatchingEnabled) throws Ex
retryStrategically((test) -> consumer.incomingMessages.peek() != null,
5, 500);
MessageImpl<ByteBuffer> msg = (MessageImpl)
consumer.incomingMessages.peek();
assertNotNull(msg);
- ByteBuf payload = ((MessageImpl) msg).getPayload();
+ ByteBuf payload = msg.getPayload();
assertNotEquals(payload.refCnt(), 0);
consumer.redeliverUnacknowledgedMessages();
- assertEquals(payload.refCnt(), 0);
+ consumer.clearIncomingMessagesAndGetMessageNumber();
+ if (payload.refCnt() != 0) {
Review comment:
We have released the payload in
`clearIncomingMessagesAndGetMessageNumber`, why need this check here? Or any
cases the payload ref count is not 0 after released the message?
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -616,6 +620,7 @@ message CommandCloseConsumer {
message CommandRedeliverUnacknowledgedMessages {
required uint64 consumer_id = 1;
repeated MessageIdData message_ids = 2;
+ optional uint64 consumer_epoch = 3 [default = 0];
Review comment:
We don't need the default value?
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
+
+ // The consumer epoch, when exclusive and failover consumer redeliver
unack message will increase the epoch
+ optional uint64 epoch = 19 [default = 0];
Review comment:
```suggestion
optional uint64 consumer_epoch = 19 [default = 0];
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
if (redeliver.getMessageIdsCount() > 0 &&
Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
+ consumer.setConsumerEpoch(redeliver.getConsumerEpoch());
Review comment:
We should avoid situations where the epoch can be reduced
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe
subscribe) {
subscriptionName,
TopicOperation.CONSUME
);
+
+ // move this because we should make the sub in this channel use one
consumer future and do such as redeliver op
Review comment:
I think the comment here is not only for this change, we will see the
comment in the codebase in the future. I think the correct description is
```
Make sure the consumer future is put into the consumers map first to avoid
the same consumer ID using different consumer futures, and only remove the
consumer future from the map if subscribe failed .
```
```
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -412,13 +411,20 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
try {
message = incomingMessages.take();
messageProcessed(message);
+ if (checkMessageImplConsumerEpochIsSmallerThanConsumer(message)) {
Review comment:
```suggestion
if (!isValidConsumerEpoch(message)) {
```
More straightforward and understandable
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
private final String clientAddress; // IP address only, no port number
included
private final MessageId startMessageId;
+ @Getter
+ @Setter
+ private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
InitialPosition subscriptionInitialPosition,
- KeySharedMeta keySharedMeta, MessageId startMessageId) {
+ KeySharedMeta keySharedMeta, MessageId startMessageId,
long consumerEpoch) {
Review comment:
I noticed there are lots of changes from the tests are related to the
newly added param, it's better to keep the old constructor to avoid the many
changes for this PR.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
private final String clientAddress; // IP address only, no port number
included
private final MessageId startMessageId;
+ @Getter
+ @Setter
+ private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
Review comment:
The constructor already init the `consumerEpoch`.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe
subscribe) {
subscriptionName,
TopicOperation.CONSUME
);
+
+ // move this because we should make the sub in this channel use one
consumer future and do such as redeliver op
+ CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+ CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId,
+ consumerFuture);
Review comment:
How about:
```java
CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId,
new CompletableFuture<>());
```
And, `computeIfAbsent` is more elegant here.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T>
msg) {
return executor;
}
+ // If message consumer epoch is smaller than consumer epoch present that
+ // it has been sent to the client before the user calls
redeliverUnacknowledgedMessages, this message is invalid.
+ // so we should release this message and receive again
+ protected boolean
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+ if ((getSubType() == CommandSubscribe.SubType.Failover
Review comment:
We should print a warn log here, which will help with troubleshooting.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T>
msg) {
return executor;
}
+ // If message consumer epoch is smaller than consumer epoch present that
+ // it has been sent to the client before the user calls
redeliverUnacknowledgedMessages, this message is invalid.
+ // so we should release this message and receive again
+ protected boolean
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+ if ((getSubType() == CommandSubscribe.SubType.Failover
+ || getSubType() == CommandSubscribe.SubType.Exclusive)
+ && (message).getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
Review comment:
why need `(message)` here? any difference with
`message.getConsumerEpoch`?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T>
msg) {
return executor;
}
+ // If message consumer epoch is smaller than consumer epoch present that
+ // it has been sent to the client before the user calls
redeliverUnacknowledgedMessages, this message is invalid.
+ // so we should release this message and receive again
+ protected boolean
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
Review comment:
For more safety, we should not filter out any message for a broker with
an old version protocol version.
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
optional KeySharedMeta keySharedMeta = 17;
repeated KeyValue subscription_properties = 18;
+
+ // The consumer epoch, when exclusive and failover consumer redeliver
unack message will increase the epoch
+ optional uint64 epoch = 19 [default = 0];
Review comment:
we don't need the default value?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
if (redeliver.getMessageIdsCount() > 0 &&
Subscription.isIndividualAckMode(consumer.subType())) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
+ consumer.setConsumerEpoch(redeliver.getConsumerEpoch());
Review comment:
For the non-persistent topic and shared subscription, we have disabled
the consumer epoch, this also means we should prevent the modification for the
consumer epoch through the message redelivery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]