congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r786562199



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        exceptionHandleFuture(e.getCause());
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) internalPinnedExecutor);
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    return null;
+                });
             }
         }
     }
 
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, completableFuture));
     }
 
-    @Override
-    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
-
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
-                switch (state) {
-                    case Initializing:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
-                    case None:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        initPendingAckStore();
-                        return completableFuture;
-                    case Error:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
-                    case Close:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
-                    default:
-                        break;
-                }
-            }
-        }
-
+    public void internalIndividualAcknowledgeMessage(TxnID txnID, 
List<MutablePair<PositionImpl, Integer>> positions,
+                                                     CompletableFuture<Void> 
completableFuture) {
         if (txnID == null) {
-            return FutureUtil.failedFuture(new 
NotAllowedException("TransactionID can not be null."));
+            FutureUtil.failedFuture(new NotAllowedException("TransactionID can 
not be null."));

Review comment:
       completableFuture.completeExceptionally(new 
NotAllowedException("TransactionID can not be null."));

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -289,67 +267,68 @@ private void addIndividualAcknowledgeMessageRequest(TxnID 
txnID,
             completableFuture.completeExceptionally(e);
             return null;
         });
-        return completableFuture;
-    }
-
-    private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
-                                                        List<PositionImpl> 
positions,
-                                                        
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
     }
 
     @Override
-    public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-                                                                
List<PositionImpl> positions,
+    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+                                                                
List<MutablePair<PositionImpl, Integer>> positions,
                                                                 boolean 
isInCacheRequest) {

Review comment:
       delete isInCacheRequest

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        exceptionHandleFuture(e.getCause());

Review comment:
       dont delete this

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription 
persistentSubscription) {
 
     private void initPendingAckStore() {
         if (changeToInitializingState()) {
-            synchronized (PendingAckHandleImpl.this) {
-                if (!checkIfClose()) {
-                    this.pendingAckStoreFuture =
-                            
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
-                    this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                        pendingAckStore.replayAsync(this,
-                                ((PersistentTopic) 
persistentSubscription.getTopic()).getBrokerService()
-                                        
.getPulsar().getTransactionReplayExecutor());
-                    }).exceptionally(e -> {
-                        acceptQueue.clear();
-                        changeToErrorState();
-                        exceptionHandleFuture(e.getCause());
-                        log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
-                        return null;
-                    });
-                }
+            if (!checkIfClose()) {
+                this.pendingAckStoreFuture =
+                        
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+                this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    pendingAckStore.replayAsync(this,
+                            (ScheduledExecutorService) internalPinnedExecutor);
+                }).exceptionally(e -> {
+                    acceptQueue.clear();
+                    changeToErrorState();
+                    log.error("PendingAckHandleImpl init fail! TopicName : {}, 
SubName: {}", topicName, subName, e);
+                    return null;
+                });
             }
         }
     }
 
     private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
                                                         
List<MutablePair<PositionImpl, Integer>> positions,
                                                         
CompletableFuture<Void> completableFuture) {
-        acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, 
true).thenAccept(v ->
-                completableFuture.complete(null)).exceptionally(e -> {
-            completableFuture.completeExceptionally(e);
-            return null;
-        }));
+        acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID, 
positions, completableFuture));
     }
 
-    @Override
-    public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-                                                                
List<MutablePair<PositionImpl, Integer>> positions,
-                                                                boolean 
isInCacheRequest) {
-
-        if (!checkIfReady()) {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            synchronized (PendingAckHandleImpl.this) {
-                switch (state) {
-                    case Initializing:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        return completableFuture;
-                    case None:
-                        addIndividualAcknowledgeMessageRequest(txnID, 
positions, completableFuture);
-                        initPendingAckStore();
-                        return completableFuture;
-                    case Error:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
-                        return completableFuture;
-                    case Close:
-                        completableFuture.completeExceptionally(
-                                new 
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
-                        return completableFuture;
-                    default:
-                        break;
-                }
-            }
-        }
-
+    public void internalIndividualAcknowledgeMessage(TxnID txnID, 
List<MutablePair<PositionImpl, Integer>> positions,
+                                                     CompletableFuture<Void> 
completableFuture) {
         if (txnID == null) {
-            return FutureUtil.failedFuture(new 
NotAllowedException("TransactionID can not be null."));
+            FutureUtil.failedFuture(new NotAllowedException("TransactionID can 
not be null."));
+            return;
+
         }
         if (positions == null) {
-            return FutureUtil.failedFuture(new NotAllowedException("Positions 
can not be null."));
+            FutureUtil.failedFuture(new NotAllowedException("Positions can not 
be null."));

Review comment:
       completableFuture.completeExceptionally(new 
NotAllowedException("Positions can not be null."));




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to