gaoran10 commented on code in PR #24189:
URL: https://github.com/apache/pulsar/pull/24189#discussion_r2154094126


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -746,4 +748,209 @@ protected void doReleaseResources() {
     public ManagedCursor getCursor() {
         return cursor;
     }
-}
+
+    @Data
+    protected static class InFlightTask {
+        Position readPos;
+        int readingEntries;
+        volatile List<Entry> entries;
+        volatile int completedEntries;
+        volatile boolean skipReadResultDueToCursorRewind;
+        final String replicatorId;
+
+        public synchronized void incCompletedEntries() {
+            if (!CollectionUtils.isEmpty(entries) && completedEntries < 
entries.size()) {
+                completedEntries++;
+            } else {
+                log.error("Unexpected calling of increase completed entries. 
{}", this.toString());
+            }
+        }
+
+        synchronized void recycle(Position readStart, int readingEntries) {
+            this.readPos = readStart;
+            this.readingEntries = readingEntries;
+            this.entries = null;
+            this.completedEntries = 0;
+            this.skipReadResultDueToCursorRewind = false;
+        }
+
+        public InFlightTask(Position readPos, int readingEntries, String 
replicatorId) {
+            this.readPos = readPos;
+            this.readingEntries = readingEntries;
+            this.replicatorId = replicatorId;
+        }
+
+        public boolean isDone() {
+            if (entries == null) {
+                return false;
+            }
+            if (entries != null && entries.isEmpty()) {
+                return true;
+            }
+            return completedEntries >= entries.size();
+        }
+
+        @Override
+        public String toString() {
+            return "Replicator InFlightTask "
+                + "{replicatorId=" + replicatorId
+                + ", readPos=" + readPos
+                + ", readingEntries=" + readingEntries
+                + ", readoutEntries=" + (entries == null ? "-1" : 
entries.size())
+                + ", completedEntries=" + completedEntries
+                + ", skipReadResultDueToCursorRewound=" + 
skipReadResultDueToCursorRewind
+                + "}";
+        }
+    }
+
+    @VisibleForTesting
+    InFlightTask createOrRecycleInFlightTaskIntoQueue(Position readPos, int 
readingEntries) {
+        synchronized (inFlightTasks) {
+            // Reuse projects that has done.
+            if (inFlightTasks.size() > 0) {
+                InFlightTask first = inFlightTasks.peek();
+                if (first.isDone()) {
+                    // Remove from the first index, and add to the latest 
index.
+                    inFlightTasks.poll();
+                    first.recycle(readPos, readingEntries);
+                    inFlightTasks.add(first);
+                    return first;
+                }
+            }
+            // New project if nothing can be reused.
+            InFlightTask task = new InFlightTask(readPos, readingEntries, 
replicatorId);
+            inFlightTasks.add(task);
+            return task;
+        }
+    }
+
+    protected InFlightTask acquirePermitsIfNotFetchingSchema() {
+        synchronized (inFlightTasks) {
+            if (hasPendingRead()) {
+                log.info("[{}] Skip the reading because there is a pending 
read task", replicatorId);
+                return null;
+            }
+            if (waitForCursorRewindingRefCnf > 0) {
+                log.info("[{}] Skip the reading due to new detected schema", 
replicatorId);
+                return null;
+            }
+            if (state != Started) {
+                log.info("[{}] Skip the reading because producer has not 
started [{}]", replicatorId, state);
+                return null;
+            }
+            // Guarantee that there is a unique cursor reading task.
+            int permits = getPermitsIfNoPendingRead();
+            if (permits == 0) {
+                return null;
+            }
+            return 
createOrRecycleInFlightTaskIntoQueue(cursor.getReadPosition(), permits);
+        }
+    }
+
+    protected int getPermitsIfNoPendingRead() {
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                boolean hasPendingCursorRead = task.readPos != null && 
task.entries == null;
+                if (hasPendingCursorRead) {
+                    // Skip the current reading if there is a pending cursor 
reading.
+                    return 0;
+                }
+            }
+            return producerQueueSize - getInflightMessagesCount();
+        }
+    }
+
+    protected int getInflightMessagesCount() {
+        int inFlight = 0;
+        synchronized (inFlightTasks) {
+            for (InFlightTask task : inFlightTasks) {
+                if (task.isDone()) {
+                    continue;
+                }
+                if (task.entries == null) {
+                    inFlight += task.readingEntries;
+                    continue;
+                }
+                inFlight += Math.max(task.entries.size() - 
task.completedEntries, 0);
+            }
+        }
+        return inFlight;
+    }
+
+    protected CompletableFuture<Void> beforeDisconnect() {
+        // Ensure no in-flight task.
+        synchronized (inFlightTasks) {
+            for (PersistentReplicator.InFlightTask task : inFlightTasks) {
+                if (!task.isDone() && 
task.readPos.compareTo(cursor.getManagedLedger().getLastConfirmedEntry()) < 0) {
+                    return CompletableFuture.failedFuture(new 
BrokerServiceException
+                            .TopicBusyException("Cannot close a replicator 
with backlog"));
+                }
+            }
+            
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Disconnecting);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    protected void afterDisconnected() {
+        doRewindCursor(false);

Review Comment:
   Why rewind the cursor after being disconnected?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -317,9 +318,30 @@ public CompletableFuture<Void> disconnect(boolean 
failIfHasBacklog, boolean clos
         }
         log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
                 getReplicatorReadPosition(), backlog);
-        return closeProducerAsync(closeTheStartingProducer);
+        return beforeDisconnect()
+            .thenCompose(__ -> closeProducerAsync(true))
+            .thenApply(__ -> {
+                afterDisconnected();

Review Comment:
   Could we move this to the whenComplete stage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to