gaoran10 commented on code in PR #24189: URL: https://github.com/apache/pulsar/pull/24189#discussion_r2154094126
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java: ########## @@ -746,4 +748,209 @@ protected void doReleaseResources() { public ManagedCursor getCursor() { return cursor; } -} + + @Data + protected static class InFlightTask { + Position readPos; + int readingEntries; + volatile List<Entry> entries; + volatile int completedEntries; + volatile boolean skipReadResultDueToCursorRewind; + final String replicatorId; + + public synchronized void incCompletedEntries() { + if (!CollectionUtils.isEmpty(entries) && completedEntries < entries.size()) { + completedEntries++; + } else { + log.error("Unexpected calling of increase completed entries. {}", this.toString()); + } + } + + synchronized void recycle(Position readStart, int readingEntries) { + this.readPos = readStart; + this.readingEntries = readingEntries; + this.entries = null; + this.completedEntries = 0; + this.skipReadResultDueToCursorRewind = false; + } + + public InFlightTask(Position readPos, int readingEntries, String replicatorId) { + this.readPos = readPos; + this.readingEntries = readingEntries; + this.replicatorId = replicatorId; + } + + public boolean isDone() { + if (entries == null) { + return false; + } + if (entries != null && entries.isEmpty()) { + return true; + } + return completedEntries >= entries.size(); + } + + @Override + public String toString() { + return "Replicator InFlightTask " + + "{replicatorId=" + replicatorId + + ", readPos=" + readPos + + ", readingEntries=" + readingEntries + + ", readoutEntries=" + (entries == null ? "-1" : entries.size()) + + ", completedEntries=" + completedEntries + + ", skipReadResultDueToCursorRewound=" + skipReadResultDueToCursorRewind + + "}"; + } + } + + @VisibleForTesting + InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int readingEntries) { + synchronized (inFlightTasks) { + // Reuse projects that has done. + if (inFlightTasks.size() > 0) { + InFlightTask first = inFlightTasks.peek(); + if (first.isDone()) { + // Remove from the first index, and add to the latest index. + inFlightTasks.poll(); + first.recycle(readPos, readingEntries); + inFlightTasks.add(first); + return first; + } + } + // New project if nothing can be reused. + InFlightTask task = new InFlightTask(readPos, readingEntries, replicatorId); + inFlightTasks.add(task); + return task; + } + } + + protected InFlightTask acquirePermitsIfNotFetchingSchema() { + synchronized (inFlightTasks) { + if (hasPendingRead()) { + log.info("[{}] Skip the reading because there is a pending read task", replicatorId); + return null; + } + if (waitForCursorRewindingRefCnf > 0) { + log.info("[{}] Skip the reading due to new detected schema", replicatorId); + return null; + } + if (state != Started) { + log.info("[{}] Skip the reading because producer has not started [{}]", replicatorId, state); + return null; + } + // Guarantee that there is a unique cursor reading task. + int permits = getPermitsIfNoPendingRead(); + if (permits == 0) { + return null; + } + return createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits); + } + } + + protected int getPermitsIfNoPendingRead() { + synchronized (inFlightTasks) { + for (InFlightTask task : inFlightTasks) { + boolean hasPendingCursorRead = task.readPos != null && task.entries == null; + if (hasPendingCursorRead) { + // Skip the current reading if there is a pending cursor reading. + return 0; + } + } + return producerQueueSize - getInflightMessagesCount(); + } + } + + protected int getInflightMessagesCount() { + int inFlight = 0; + synchronized (inFlightTasks) { + for (InFlightTask task : inFlightTasks) { + if (task.isDone()) { + continue; + } + if (task.entries == null) { + inFlight += task.readingEntries; + continue; + } + inFlight += Math.max(task.entries.size() - task.completedEntries, 0); + } + } + return inFlight; + } + + protected CompletableFuture<Void> beforeDisconnect() { + // Ensure no in-flight task. + synchronized (inFlightTasks) { + for (PersistentReplicator.InFlightTask task : inFlightTasks) { + if (!task.isDone() && task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) { + return CompletableFuture.failedFuture(new BrokerServiceException + .TopicBusyException("Cannot close a replicator with backlog")); + } + } + beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting); + return CompletableFuture.completedFuture(null); + } + } + + protected void afterDisconnected() { + doRewindCursor(false); Review Comment: Why rewind the cursor after being disconnected? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java: ########## @@ -317,9 +318,30 @@ public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean clos } log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, getReplicatorReadPosition(), backlog); - return closeProducerAsync(closeTheStartingProducer); + return beforeDisconnect() + .thenCompose(__ -> closeProducerAsync(true)) + .thenApply(__ -> { + afterDisconnected(); Review Comment: Could we move this to the whenComplete stage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org