poorbarcode commented on code in PR #18315:
URL: https://github.com/apache/pulsar/pull/18315#discussion_r1014976583
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -95,6 +95,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected boolean sendInProgress;
+ protected boolean sendInProgressReplay;
Review Comment:
Maybe we can combine the two variables into a counter? Just like this:
```java
AtomicInteger sendingTaskCounter;
sendingTaskCounter.increment();
try {
trySendMessagesToConsumer();
} finally {
int runningTaskCount = sendingTaskCounter.decrementAndGet();
if (runningTaskCount == 0){
readMoreEntries();
}
}
```
I think this makes the logic easier to unstand.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -549,34 +550,58 @@ public final synchronized void
readEntriesComplete(List<Entry> entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name,
entries.size(), consumerList.size());
}
+ sendMessagesToConsumers(readType, entries);
+ }
+
+ protected final synchronized void sendMessagesToConsumers(ReadType
readType, List<Entry> entries) {
// dispatch messages to a separate thread, but still in order for this
subscription
Review Comment:
There is a lot of duplicate code in the method `sendMessagesToConsumers`,
which I think should be optimized
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]