codelipenghui commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1386800168


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -134,7 +140,12 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
 
     public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
 
+    private final long pendingAckInitFailureBackoffInitialTimeInMs = 5000;

Review Comment:
   This will be used by the init backoff delay time. We'd better change to a 
lower value by default? for example: 100ms?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##########
@@ -99,6 +104,52 @@ protected void cleanup() {
         super.internalCleanup();
     }
 
+    @Test
+    public void testUnloadSubscriptionWhenFailedInitPendingAck() throws 
Exception {
+        String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .subscriptionName("subName1")
+                .topic(topic)
+                .subscribe();
+        // Fail at transactionPendingAckStoreProvider::checkInitializedBefore.
+        Field transactionPendingAckStoreProviderField = PulsarService.class
+                .getDeclaredField("transactionPendingAckStoreProvider");
+        transactionPendingAckStoreProviderField.setAccessible(true);
+        TransactionPendingAckStoreProvider pendingAckStoreProvider =
+                (TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+                        .get(pulsarServiceList.get(0));
+        TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+        // Test retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+        Awaitility.await().until(() -> {
+            pulsarClient.newConsumer()
+                    .subscriptionName("subName2")
+                    .topic(topic)
+                    .subscribe();
+            return true;
+        });
+        // Test no-retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
+                        .NonRecoverableLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        try {
+            Awaitility.await().until(() -> {

Review Comment:
   Do we really need `Awaitility` here?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##########
@@ -99,6 +104,52 @@ protected void cleanup() {
         super.internalCleanup();
     }
 
+    @Test
+    public void testUnloadSubscriptionWhenFailedInitPendingAck() throws 
Exception {
+        String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .subscriptionName("subName1")
+                .topic(topic)
+                .subscribe();
+        // Fail at transactionPendingAckStoreProvider::checkInitializedBefore.
+        Field transactionPendingAckStoreProviderField = PulsarService.class
+                .getDeclaredField("transactionPendingAckStoreProvider");
+        transactionPendingAckStoreProviderField.setAccessible(true);
+        TransactionPendingAckStoreProvider pendingAckStoreProvider =
+                (TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+                        .get(pulsarServiceList.get(0));
+        TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+        // Test retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+        Awaitility.await().until(() -> {
+            pulsarClient.newConsumer()
+                    .subscriptionName("subName2")
+                    .topic(topic)
+                    .subscribe();
+            return true;
+        });
+        // Test no-retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
+                        .NonRecoverableLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        try {
+            Awaitility.await().until(() -> {
+                pulsarClient.newConsumer()
+                        .subscriptionName("subName3")
+                        .topic(topic)
+                        .subscribe();
+                return true;
+            });
+            fail();
+        } catch (Exception exception) {
+            assertTrue(exception.getCause() instanceof TimeoutException);

Review Comment:
   It shouldn't get a timeout exception. If the pending ack is unrecoverable, 
the consumer should get a real reason instead of a timeout.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##########
@@ -99,6 +104,52 @@ protected void cleanup() {
         super.internalCleanup();
     }
 
+    @Test
+    public void testUnloadSubscriptionWhenFailedInitPendingAck() throws 
Exception {
+        String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .subscriptionName("subName1")
+                .topic(topic)
+                .subscribe();
+        // Fail at transactionPendingAckStoreProvider::checkInitializedBefore.
+        Field transactionPendingAckStoreProviderField = PulsarService.class
+                .getDeclaredField("transactionPendingAckStoreProvider");
+        transactionPendingAckStoreProviderField.setAccessible(true);
+        TransactionPendingAckStoreProvider pendingAckStoreProvider =
+                (TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+                        .get(pulsarServiceList.get(0));
+        TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+        // Test retryable exception.
+        when(mockProvider.checkInitializedBefore(any()))
+                .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail")))
+                .thenReturn(CompletableFuture.completedFuture(false));
+        transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+        Awaitility.await().until(() -> {

Review Comment:
   Do we really need `Awaitility` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to