codelipenghui commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1374224203


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
     }
 
     public void exceptionHandleFuture(Throwable t) {
+        if(isRetryableException(t)) {
+            long retryTime = backoff.next();
+            log.warn("[{}][{}] Failed to init transaction pending ack handler. 
It will be retried in {} Ms",
+                    persistentSubscription.getTopic().getName(), subName, 
retryTime, t);
+            transactionOpTimer.newTimeout((timeout) -> init(), retryTime, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: 
{}", topicName, subName, t);
+        handleCacheRequest();
+        changeToErrorState();
+        this.pendingAckStoreFuture.completeExceptionally(new 
TransactionPendingAckException
+                
.TransactionPendingAckStoreInitException(String.format("PendingAckHandleImpl 
init fail! TopicName : "

Review Comment:
   We already defined the `TransactionPendingAckStoreInitException`. The 
message of the exception should be provided by the 
`TransactionPendingAckStoreInitException`. And I don't think we need to add the 
topic name, or subscription name here. The caller knows which topic and 
subscription they are requesting.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
     }
 
     public void exceptionHandleFuture(Throwable t) {
+        if(isRetryableException(t)) {
+            long retryTime = backoff.next();
+            log.warn("[{}][{}] Failed to init transaction pending ack handler. 
It will be retried in {} Ms",
+                    persistentSubscription.getTopic().getName(), subName, 
retryTime, t);
+            transactionOpTimer.newTimeout((timeout) -> init(), retryTime, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: 
{}", topicName, subName, t);

Review Comment:
   Please follow the same 
[format](https://github.com/apache/pulsar/pull/21274/files#diff-26056e00d43a29a9ff02a53654d692aef03e971fcec6c4f57b546afcfff2250dR960)
 instead of arbitrarily define the log format.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
     }
 
     public void exceptionHandleFuture(Throwable t) {
+        if(isRetryableException(t)) {
+            long retryTime = backoff.next();
+            log.warn("[{}][{}] Failed to init transaction pending ack handler. 
It will be retried in {} Ms",
+                    persistentSubscription.getTopic().getName(), subName, 
retryTime, t);
+            transactionOpTimer.newTimeout((timeout) -> init(), retryTime, 
TimeUnit.MILLISECONDS);
+            return;
+        }
+        log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: 
{}", topicName, subName, t);
+        handleCacheRequest();
+        changeToErrorState();
+        this.pendingAckStoreFuture.completeExceptionally(new 
TransactionPendingAckException
+                
.TransactionPendingAckStoreInitException(String.format("PendingAckHandleImpl 
init fail! TopicName : "
+                + "%s, SubName: %s", topicName, subName), t));
         final boolean completedNow = 
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
         if (completedNow) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }
 
+    private static boolean isRetryableException(Throwable ex) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+        return (realCause instanceof ManagedLedgerException
+                || realCause instanceof 
PulsarClientException.BrokerPersistenceException
+                || realCause instanceof PulsarClientException.LookupException
+                || realCause instanceof PulsarClientException.ConnectException)
+                && !(realCause instanceof 
ManagedLedgerException.ManagedLedgerFencedException)
+                && !(realCause instanceof 
ManagedLedgerException.NonRecoverableLedgerException);

Review Comment:
   It's really hard to read
   
   I think you want to say:
   
   ```java
   return (realCause instanceof ManagedLedgerException && !(realCause 
instanceof ManagedLedgerException.ManagedLedgerFencedException) && !(realCause 
instanceof ManagedLedgerException.NonRecoverableLedgerException))
                   || realCause instanceof 
PulsarClientException.BrokerPersistenceException
                   || realCause instanceof PulsarClientException.LookupException
                   || realCause instanceof 
PulsarClientException.ConnectException;
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##########
@@ -99,6 +104,52 @@ protected void cleanup() {
         super.internalCleanup();
     }
 
+    @Test
+    public void testUnloadSubscriptionWhenFailedInitPendingAck() throws 
Exception {
+        String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .subscriptionName("subName1")
+                .topic(topic)
+                .subscribe();
+        // Fail at transactionPendingAckStoreProvider::checkInitializedBefore.
+        Field transactionPendingAckStoreProviderField = PulsarService.class
+                .getDeclaredField("transactionPendingAckStoreProvider");
+        transactionPendingAckStoreProviderField.setAccessible(true);
+        TransactionPendingAckStoreProvider pendingAckStoreProvider =
+                (TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+                        .get(pulsarServiceList.get(0));
+        TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+        // Test retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+        Awaitility.await().until(() -> {
+            pulsarClient.newConsumer()
+                    .subscriptionName("subName2")
+                    .topic(topic)
+                    .subscribe();
+            return true;
+        });

Review Comment:
   How the test can be passed? The retry logic will also call 
`pendingAckStoreProvider.checkInitializedBefore`; it should consistently fail 
no matter how many times you retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to