codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r776373051
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,6 +129,11 @@
private final String clientAddress; // IP address only, no port number
included
private final MessageId startMessageId;
+ @Getter
+ @Setter
+ private volatile long consumerEpoch = DEFAULT_READ_EPOCH;
+ public static final long DEFAULT_READ_EPOCH = 0L;
Review comment:
Can we remove this one, just provide a getter method for
`consumerEpoch`?
It looks like:
1. get epoch from consumer
2. use the epoch to read data
3. dispatch messages along with the epoch
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -575,4 +595,47 @@ public boolean checkAndUnblockIfStuck() {
}
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+ public static class ReadEntriesCallBackWrapper {
Review comment:
```suggestion
public static class ReadEntriesCtx {
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -267,7 +268,7 @@ protected void sendMessagesToConsumers(ReadType readType,
List<Entry> entries) {
consumer.sendMessages(entriesWithSameKey, batchSizes,
batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(),
- getRedeliveryTracker()).addListener(future -> {
+ getRedeliveryTracker(),
DEFAULT_READ_EPOCH).addListener(future -> {
Review comment:
Key shared also needs to avoid the out-order messages. Separately fixed
in another PR is ok, I'm not sure if there are other ordering issues with
Key_Shared subscription when redelivery messages, need more further investigated
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -115,16 +117,18 @@ protected void scheduleReadOnActiveConsumer() {
}
readOnActiveConsumerTask =
topic.getBrokerService().executor().schedule(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind cursor and read more entries after {}
ms delay", name,
-
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ // in order to prevent redeliverUnacknowledgedMessages rewind
cursor again
+ synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Review comment:
Could you please provide more context here? I'm more sure why adding
`synchronized` here can prevent rewinding the cursor again. The Runnable
instance waiting here but after the previous one is complete, it will rewind
the cursor.
##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -604,6 +605,8 @@ message CommandCloseConsumer {
message CommandRedeliverUnacknowledgedMessages {
required uint64 consumer_id = 1;
repeated MessageIdData message_ids = 2;
+ optional uint64 consumer_epoch = 3 [default = 0];
+ optional uint64 request_id = 4;
Review comment:
@congbobo184 do we need this one?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
##########
@@ -575,4 +595,47 @@ public boolean checkAndUnblockIfStuck() {
}
private static final Logger log =
LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
+
+ public static class ReadEntriesCallBackWrapper {
Review comment:
We don't wrapper the read entry callback, just provide a combined ctx.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]