void-ptr974 commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3409604819
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -360,20 +378,64 @@ public void readEntriesComplete(List<Entry> entries,
Object ctx) {
readFailureBackoff.reduceToHalf();
- boolean atLeastOneMessageSentForReplication =
replicateEntries(entries, inFlightTask);
+ boolean producerIsWritable = isWritable();
+ replicateEntries(entries, inFlightTask, !producerIsWritable);
- if (atLeastOneMessageSentForReplication && !isWritable()) {
+ if (!producerIsWritable) {
// Don't read any more entries until the current pending entries
are persisted
log.debug()
- .attr("atLeastOneMessageSentForReplication",
atLeastOneMessageSentForReplication)
.attr("isWritable", isWritable())
.log("Pausing replication traffic");
} else {
readMoreEntries();
}
}
- protected abstract boolean replicateEntries(List<Entry> entries,
InFlightTask inFlightTask);
+ protected void replicateEntries(List<Entry> entries, InFlightTask
inFlightTask,
+ final boolean skippedReadAfterSent) {
+ latestPublishTime = System.currentTimeMillis();
+ // Release memory if terminated.
+ if (state == State.Terminated || state == State.Terminating
+ || inFlightTask.isSkipReadResultDueToCursorRewind()) {
+ for (Entry entry : entries) {
+ inFlightTask.incCompletedEntries();
+ entry.release();
+ }
+ return;
+ }
+
+ // Retry to replicate messages if it is not started.
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) cursor.getManagedLedger();
+ Runnable retryReplicateEntries = () -> {
+ ml.getScheduledExecutor().schedule(() -> {
+ ml.getExecutor().execute(() -> {
+ replicateEntries(entries, inFlightTask,
skippedReadAfterSent);
+ });
+ }, 100, TimeUnit.MILLISECONDS);
+ };
+
+ // Retry.
+ if (state == Disconnecting || state == Starting) {
+ retryReplicateEntries.run();
+ return;
+ }
+ // Start producer and retry.
+ if (state == Disconnected) {
Review Comment:
This recovery path depends on reaching `replicateEntries()`, but a
disconnected replicator may not always have a pending read callback that gets
here.
Trigger scenario:
1. `disconnectIfNoTrafficAndBacklog()` disconnects the producer while
backlog is 0.
2. At that moment there is no pending `asyncReadEntriesOrWait`, or reads are
blocked by rate limits, permits, schema fetch, backoff, etc.
3. New messages are written and replication backlog appears.
4. Since the state is `Disconnected`, `readMoreEntries()` may not create a
normal read task, and `PersistentTopic.addProducer()` no longer starts
replication producers.
5. The replicator can stay `Disconnected`.
Impact:
Replication can stall and backlog can keep growing until some external event
reloads/recreates the topic or otherwise restarts the replicator. Combined with
inactive-topic GC, this also increases the risk of deleting unreplicated data.
Suggested fix:
Please add a deterministic resume path outside this read callback. For
example, the periodic inactive-replication check could restart a disconnected
replicator when backlog appears:
```java
if (state == Disconnected && getNumberOfEntriesInBacklog() > 0) {
startProducer();
return;
}
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3355,7 +3313,7 @@ public boolean isActive(InactiveTopicDeleteMode
deleteMode) {
break;
}
// no local producers
- return hasLocalProducers();
+ return hasProducersActive() || hasActiveReplicators();
Review Comment:
I think this still needs to account for replication backlog, not only
connected replicators.
Trigger scenario:
1. A replicator is idle-disconnected when its backlog is 0.
2. New messages are written later, so the replication cursor gets backlog.
3. The replicator is still `Disconnected`, so `hasActiveReplicators()`
returns false.
4. The producer closes and the topic has no
subscriptions/producers/connected replicators.
5. `checkGC()` can enter the delete path.
Impact:
This can delete the topic ledger while messages are still pending
replication to the remote cluster, which is a data-loss risk.
Suggested fix:
Please make inactive-topic GC explicitly protect replication backlog. For
example, this activity check could include `isReplicationBacklogExist()`, and
the GC delete path should also have a final guard before deleting so the topic
cannot be removed while any replication cursor has backlog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]