codelipenghui commented on code in PR #17164:
URL: https://github.com/apache/pulsar/pull/17164#discussion_r972872271


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,26 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> 
cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> 
updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new 
CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new 
MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = 
ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = 
updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties 
{}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = 
cursorProperties;
+                        ManagedCursorImpl.this.cursorProperties = 
newProperties;

Review Comment:
   Do we need to add
   
   ```java
   ManagedCursorImpl.this.managedCursorInfo = cursorInfo;
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -324,24 +328,26 @@ public Map<String, String> getCursorProperties() {
         return cursorProperties;
     }
 
-    @Override
-    public CompletableFuture<Void> setCursorProperties(Map<String, String> 
cursorProperties) {
+    private CompletableFuture<Void> computeCursorProperties(
+            final Function<Map<String, String>, Map<String, String>> 
updateFunction) {
         CompletableFuture<Void> updateCursorPropertiesResult = new 
CompletableFuture<>();
-        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new 
MetaStoreCallback<>() {
-            @Override
-            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                ManagedCursorInfo copy = ManagedCursorInfo
-                        .newBuilder(info)
-                        .clearCursorProperties()
-                        
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
-                        .build();
-                ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
-                        name, copy, stat, new MetaStoreCallback<>() {
+
+        final Stat lastCursorLedgerStat = 
ManagedCursorImpl.this.cursorLedgerStat;
+
+        Map<String, String> newProperties = 
updateFunction.apply(ManagedCursorImpl.this.cursorProperties);
+        ManagedCursorInfo copy = ManagedCursorInfo
+                .newBuilder(ManagedCursorImpl.this.managedCursorInfo)
+                .clearCursorProperties()
+                
.addAllCursorProperties(buildStringPropertiesMap(newProperties))
+                .build();
+
+        ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
+                name, copy, lastCursorLedgerStat, new MetaStoreCallback<>() {
                     @Override
                     public void operationComplete(Void result, Stat stat) {
                         log.info("[{}] Updated ledger cursor: {} properties 
{}", ledger.getName(),
                                 name, cursorProperties);
-                        ManagedCursorImpl.this.cursorProperties = 
cursorProperties;
+                        ManagedCursorImpl.this.cursorProperties = 
newProperties;

Review Comment:
   Please also add a test for this case. 
   If we missed here, the `persistPositionMetaStore` method would use the old 
ManagedCursorInfo to update zookeeper, but with the correct `cursorLedgerStat`



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java:
##########
@@ -68,4 +69,28 @@ public static CompletableFuture<Void> 
waitForAll(List<CompletableFuture<Void>> f
 
         return compositeFuture;
     }
+
+    public static <T> CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op,
+                                                            Class<? extends 
Exception> needRetryExceptionClass) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        op.get().whenComplete((res, ex) -> {
+            if (ex == null) {
+                resultFuture.complete(res);
+            } else {
+                if (needRetryExceptionClass.isAssignableFrom(ex.getClass())) {
+                    op.get().whenComplete((res2, ex2) -> {
+                        if (ex2 == null) {
+                            resultFuture.complete(res2);
+                        } else {
+                            resultFuture.completeExceptionally(ex2);
+                        }
+                    });

Review Comment:
   Will it only retry once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to