codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432073401
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();
+
+ default boolean isMetadataTooLargeToPersist() {
Review Comment:
I would like suggest to change to `isCursorDataFullyPersistable()`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -366,6 +377,30 @@ protected void readMoreEntries(Consumer consumer) {
}
}
+ private boolean shouldPauseOnAckStatePersist() {
+ if (!((PersistentTopic)
subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+ return false;
+ }
+ if (cursor == null) {
+ return true;
+ }
+ if (!cursor.isMetadataTooLargeToPersist()) {
+ return false;
+ }
+ // The cursor state is too large to persist, let us check whether the
read is a replay read.
+ Range<PositionImpl> lastIndividualDeletedRange =
cursor.getLastIndividualDeletedRange();
+ if (lastIndividualDeletedRange == null) {
+ // lastIndividualDeletedRange is null means the read is not replay
read.
+ return true;
+ }
+ // If read position is less than the last acked position, it means the
read is a replay read.
+ PositionImpl lastAckedPosition =
lastIndividualDeletedRange.upperEndpoint();
+ Position readPosition = cursor.getReadPosition();
+ boolean readPositionIsSmall =
+ lastAckedPosition.compareTo(readPosition.getLedgerId(),
readPosition.getEntryId()) > 0;
+ return !readPositionIsSmall;
+ }
+
Review Comment:
Hmmm. For the single active consumer subscription. Practically, I think we
don't need to consider the ack state persistence because it should be a misuse.
Maybe we can skip the single active consumer subscription right now. And add it
later if we really need it, based on the actual needs.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
return false;
}
+ /**
+ * A callback hook after acknowledge messages.
+ * If acknowledge successfully, {@param position} will not be null, and
{@param position} and {@param ctx} will be
+ * null.
+ * If acknowledge failed. {@param position} will be null, and {@param
position} and {@param ctx} will not be null.
+ */
+ default void afterAckMessages(Object position, Throwable error, Object
ctx){}
Review Comment:
If it's inevitable. IMO, convert from the outside is better. From the API's
perspective, we should have a more clear definition
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -354,6 +369,19 @@ public synchronized void readMoreEntries() {
}
}
+ private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+ if (readType != ReadType.Normal) {
+ return false;
+ }
Review Comment:
I think the reason should be consumers can disconnect from the topic if we
stop dispatching messages to the active consumer. Somehow, the dispatching will
stop forever.
@poorbarcode could you please add some comments here? And we should also add
test for the consumer disconnection to make sure the remaining active consumers
can ultimately consume all messages.
Could you please add some comments here and we should also add test for the
consumer disconnection to make sure the remaining active consumers can
ultimately consume all messages
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
return false;
}
+ /**
+ * A callback hook after acknowledge messages.
+ * If acknowledge successfully, {@param position} will not be null, and
{@param position} and {@param ctx} will be
+ * null.
+ * If acknowledge failed. {@param position} will be null, and {@param
position} and {@param ctx} will not be null.
+ */
+ default void afterAckMessages(Object position, Throwable error, Object
ctx){}
Review Comment:
Why the type of position should be defined as Object?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -995,6 +1023,13 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}
+ @Override
+ public void afterAckMessages(Object position, Throwable error, Object ctx)
{
+ if (!cursor.isMetadataTooLargeToPersist()) {
Review Comment:
Will it waste too many CPU circles if `cursor.isMetadataTooLargeToPersist()`
is always false? This method `readMoreEntriesAsync` will be executed by another
thread and it will do lot of checks for each call.
I think what we want to do is if the dispatcher is paused by the ack state
persistence before, but now the ack state persistence is good. We should resume
the message dispatching.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]