lifepuzzlefun commented on code in PR #20394:
URL: https://github.com/apache/pulsar/pull/20394#discussion_r1208357469


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2820,13 +2827,23 @@ public void operationComplete() {
 
                 @Override
                 public void operationFailed(ManagedLedgerException exception) {
+                    if (!isCbCompleted.compareAndSet(false, true)) {
+                        log.info("[{}] Persist position {} already timed out 
for cursor {}", ledger.getName(),
+                                mdEntry.newPosition, name);
+                        return;
+                    }
                     log.warn("[{}] Failed to persist position {} for cursor 
{}", ledger.getName(),
                             mdEntry.newPosition, name);
 
                     deleteLedgerAsync(newLedgerHandle);
                     callback.operationFailed(exception);
                 }
-            });
+            };
+            // persist position completes the cursor recovery callback. it's 
important to retry if callabck doesn't
+            // complete else it can create unavailability for cursor read
+            persistPositionToLedgerWithRetry(newLedgerHandle, mdEntry, 
persistPositionCb, isCbCompleted,
+                    AsyncOperationTimeoutSeconds);
+            persistPositionToLedger(newLedgerHandle, mdEntry, 
persistPositionCb);

Review Comment:
   maybe this line for persistentPositionToLedger can be removed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2835,6 +2852,21 @@ public void operationFailed(ManagedLedgerException 
exception) {
         });
     }
 
+    @VisibleForTesting
+    protected void persistPositionToLedgerWithRetry(LedgerHandle lh, 
MarkDeleteEntry mdEntry,
+            VoidCallback persistPositionCb, AtomicBoolean isCbCompleted, int 
timeoutSec) {
+        persistPositionToLedger(lh, mdEntry, persistPositionCb);
+        ledger.getScheduledExecutor().schedule(() -> {
+            if (isCbCompleted.get()) {
+                return;
+            }
+            log.info("[{}] couldn't persist position {} in timeout {} sec for 
cursor {}, retrying..", ledger.getName(),
+                    mdEntry.newPosition, timeoutSec, name);
+            // keep retrying to recover cursor and enable reads
+            persistPositionToLedgerWithRetry(lh, mdEntry, persistPositionCb, 
isCbCompleted, timeoutSec);

Review Comment:
   if this will retry again i wonder if the ManagedCursorImpl.this lock should 
be acquired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to