congbobo184 commented on code in PR #18886:
URL: https://github.com/apache/pulsar/pull/18886#discussion_r1098781763
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -283,12 +283,22 @@ public PersistentTopic(String topic, ManagedLedger
ledger, BrokerService brokerS
// ignore it for now and let the message dedup logic to take
care of it
} else {
final String subscriptionName = Codec.decode(cursor.getName());
- subscriptions.put(subscriptionName,
createPersistentSubscription(subscriptionName, cursor,
+ PersistentSubscription subscription =
createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
- cursor.getCursorProperties()));
- // subscription-cursor gets activated by default: deactivate
as there is no active subscription right
- // now
- subscriptions.get(subscriptionName).deactivateCursor();
+ cursor.getCursorProperties());
+ subscriptions.put(subscriptionName, subscription);
+ subscription.getInitializeFuture()
+ .exceptionally(t -> {
+ log.warn("PersistentSubscription [{}]
pendingAckHandleImpl relay failed "
+ + "when initialize topic [{}].",
subscriptionName, topic, t);
+ if (subscriptions.remove(subscriptionName,
subscription)) {
+ subscription.retryClose();
Review Comment:
why should add this method? persistentTopic init sub then remove will return
false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]