poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1536576511
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -451,7 +472,7 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
- brokerService.executor().schedule(this::readMoreEntries,
waitTimeMillis, TimeUnit.MILLISECONDS);
+ brokerService.executor().schedule(() -> readMoreEntries(),
waitTimeMillis, TimeUnit.MILLISECONDS);
Review Comment:
Fixed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +274,134 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
+ public CompletableFuture<Void> disconnect(boolean failIfHasBacklog,
boolean closeTheStartingProducer) {
+ long backlog = getNumberOfEntriesInBacklog();
+ if (failIfHasBacklog && backlog > 0) {
+ CompletableFuture<Void> disconnectFuture = new
CompletableFuture<>();
+ disconnectFuture.completeExceptionally(new
TopicBusyException("Cannot close a replicator with backlog"));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Replicator disconnect failed since topic has
backlog", replicatorId);
+ }
+ return disconnectFuture;
+ }
+ log.info("[{}] Disconnect replicator at position {} with backlog {}",
replicatorId,
+ getReplicatorReadPosition(), backlog);
+ return closeProducerAsync(closeTheStartingProducer);
+ }
+
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
Review Comment:
`@Deprecated` has been removed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +274,134 @@ protected CompletableFuture<Boolean>
isLocalTopicActive() {
}, brokerService.executor());
}
- protected synchronized CompletableFuture<Void> closeProducerAsync() {
- if (producer == null) {
- STATE_UPDATER.set(this, State.Stopped);
+ /**
+ * @Deprecated This method only be used by {@link PersistentTopic#checkGC}
now.
+ * TODO "PersistentReplicator.replicateEntries" may get a
NullPointerException if this method and a
+ * "cursor.readComplete" execute concurrently.
+ */
+ @Deprecated
Review Comment:
`@Deprecated` has been removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]