This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7b259131c3ed41a6b1a28f85171d908f4120535a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 31 22:57:16 2023 +0300

    [fix][test] Fix flaky PersistentSubscriptionTest (#20434)
    
    (cherry picked from commit 242758d5770de46e506855ff881472cbc274cedb)
---
 .../persistent/PersistentSubscriptionTest.java     | 106 +++++++++++----------
 .../testcontext/NonStartableTestPulsarService.java |  17 ++++
 2 files changed, 72 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 401f52daa62..87408598889 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -92,7 +92,12 @@ public class PersistentSubscriptionTest {
     public void setup() throws Exception {
         pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
                 .spyByDefault()
-                .configCustomizer(config -> 
config.setTransactionCoordinatorEnabled(true))
+                .configCustomizer(config -> {
+                    config.setTransactionCoordinatorEnabled(true);
+                    config.setTransactionPendingAckStoreProviderClassName(
+                            
CustomTransactionPendingAckStoreProvider.class.getName());
+                    
config.setTransactionBufferProviderClassName(InMemTransactionBufferProvider.class.getName());
+                })
                 .useTestPulsarResources()
                 .build();
 
@@ -100,56 +105,6 @@ public class PersistentSubscriptionTest {
         doReturn(Optional.of(new Policies())).when(namespaceResources)
                 .getPoliciesIfCached(any());
 
-        doReturn(new 
InMemTransactionBufferProvider()).when(pulsarTestContext.getPulsarService())
-                .getTransactionBufferProvider();
-        doReturn(new TransactionPendingAckStoreProvider() {
-            @Override
-            public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
-                return CompletableFuture.completedFuture(new PendingAckStore() 
{
-                    @Override
-                    public void replayAsync(PendingAckHandleImpl 
pendingAckHandle, ExecutorService executorService) {
-                        try {
-                            Field field = 
PendingAckHandleState.class.getDeclaredField("state");
-                            field.setAccessible(true);
-                            field.set(pendingAckHandle, 
PendingAckHandleState.State.Ready);
-                        } catch (NoSuchFieldException | IllegalAccessException 
e) {
-                            fail();
-                        }
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> closeAsync() {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> appendIndividualAck(TxnID 
txnID, List<MutablePair<PositionImpl, Integer>> positions) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> appendCumulativeAck(TxnID 
txnID, PositionImpl position) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> appendCommitMark(TxnID 
txnID, AckType ackType) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> appendAbortMark(TxnID 
txnID, AckType ackType) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-                });
-            }
-
-            @Override
-            public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
-                return CompletableFuture.completedFuture(true);
-            }
-        
}).when(pulsarTestContext.getPulsarService()).getTransactionPendingAckStoreProvider();
-
         ledgerMock = mock(ManagedLedgerImpl.class);
         cursorMock = mock(ManagedCursorImpl.class);
         managedLedgerConfigMock = mock(ManagedLedgerConfig.class);
@@ -279,4 +234,53 @@ public class PersistentSubscriptionTest {
         // `acknowledgeMessage` should update cursor last active
         assertTrue(persistentSubscription.cursor.getLastActive() > 
beforeAcknowledgeTimestamp);
     }
+
+    public static class CustomTransactionPendingAckStoreProvider implements 
TransactionPendingAckStoreProvider {
+        @Override
+        public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
+            return CompletableFuture.completedFuture(new PendingAckStore() {
+                @Override
+                public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ExecutorService executorService) {
+                    try {
+                        Field field = 
PendingAckHandleState.class.getDeclaredField("state");
+                        field.setAccessible(true);
+                        field.set(pendingAckHandle, 
PendingAckHandleState.State.Ready);
+                    } catch (NoSuchFieldException | IllegalAccessException e) {
+                        fail();
+                    }
+                }
+
+                @Override
+                public CompletableFuture<Void> closeAsync() {
+                    return CompletableFuture.completedFuture(null);
+                }
+
+                @Override
+                public CompletableFuture<Void> appendIndividualAck(TxnID txnID,
+                                                                   
List<MutablePair<PositionImpl, Integer>> positions) {
+                    return CompletableFuture.completedFuture(null);
+                }
+
+                @Override
+                public CompletableFuture<Void> appendCumulativeAck(TxnID 
txnID, PositionImpl position) {
+                    return CompletableFuture.completedFuture(null);
+                }
+
+                @Override
+                public CompletableFuture<Void> appendCommitMark(TxnID txnID, 
AckType ackType) {
+                    return CompletableFuture.completedFuture(null);
+                }
+
+                @Override
+                public CompletableFuture<Void> appendAbortMark(TxnID txnID, 
AckType ackType) {
+                    return CompletableFuture.completedFuture(null);
+                }
+            });
+        }
+
+        @Override
+        public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
+            return CompletableFuture.completedFuture(true);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 4b7762a2acf..13c4d7d72af 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.Mockito.mock;
 import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -39,6 +40,8 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -89,6 +92,20 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
         } catch (PulsarServerException e) {
             throw new RuntimeException(e);
         }
+        if (config.isTransactionCoordinatorEnabled()) {
+            try {
+                setTransactionBufferProvider(TransactionBufferProvider
+                        
.newProvider(config.getTransactionBufferProviderClassName()));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            try {
+                
setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider
+                        
.newProvider(config.getTransactionPendingAckStoreProviderClassName()));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
     }
 
     @Override

Reply via email to