This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7b259131c3ed41a6b1a28f85171d908f4120535a Author: Lari Hotari <[email protected]> AuthorDate: Wed May 31 22:57:16 2023 +0300 [fix][test] Fix flaky PersistentSubscriptionTest (#20434) (cherry picked from commit 242758d5770de46e506855ff881472cbc274cedb) --- .../persistent/PersistentSubscriptionTest.java | 106 +++++++++++---------- .../testcontext/NonStartableTestPulsarService.java | 17 ++++ 2 files changed, 72 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 401f52daa62..87408598889 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -92,7 +92,12 @@ public class PersistentSubscriptionTest { public void setup() throws Exception { pulsarTestContext = PulsarTestContext.builderForNonStartableContext() .spyByDefault() - .configCustomizer(config -> config.setTransactionCoordinatorEnabled(true)) + .configCustomizer(config -> { + config.setTransactionCoordinatorEnabled(true); + config.setTransactionPendingAckStoreProviderClassName( + CustomTransactionPendingAckStoreProvider.class.getName()); + config.setTransactionBufferProviderClassName(InMemTransactionBufferProvider.class.getName()); + }) .useTestPulsarResources() .build(); @@ -100,56 +105,6 @@ public class PersistentSubscriptionTest { doReturn(Optional.of(new Policies())).when(namespaceResources) .getPoliciesIfCached(any()); - doReturn(new InMemTransactionBufferProvider()).when(pulsarTestContext.getPulsarService()) - .getTransactionBufferProvider(); - doReturn(new TransactionPendingAckStoreProvider() { - @Override - public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) { - return CompletableFuture.completedFuture(new PendingAckStore() { - @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { - try { - Field field = PendingAckHandleState.class.getDeclaredField("state"); - field.setAccessible(true); - field.set(pendingAckHandle, PendingAckHandleState.State.Ready); - } catch (NoSuchFieldException | IllegalAccessException e) { - fail(); - } - } - - @Override - public CompletableFuture<Void> closeAsync() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> appendIndividualAck(TxnID txnID, List<MutablePair<PositionImpl, Integer>> positions) { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, PositionImpl position) { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType ackType) { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType ackType) { - return CompletableFuture.completedFuture(null); - } - }); - } - - @Override - public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) { - return CompletableFuture.completedFuture(true); - } - }).when(pulsarTestContext.getPulsarService()).getTransactionPendingAckStoreProvider(); - ledgerMock = mock(ManagedLedgerImpl.class); cursorMock = mock(ManagedCursorImpl.class); managedLedgerConfigMock = mock(ManagedLedgerConfig.class); @@ -279,4 +234,53 @@ public class PersistentSubscriptionTest { // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); } + + public static class CustomTransactionPendingAckStoreProvider implements TransactionPendingAckStoreProvider { + @Override + public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) { + return CompletableFuture.completedFuture(new PendingAckStore() { + @Override + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { + try { + Field field = PendingAckHandleState.class.getDeclaredField("state"); + field.setAccessible(true); + field.set(pendingAckHandle, PendingAckHandleState.State.Ready); + } catch (NoSuchFieldException | IllegalAccessException e) { + fail(); + } + } + + @Override + public CompletableFuture<Void> closeAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<Void> appendIndividualAck(TxnID txnID, + List<MutablePair<PositionImpl, Integer>> positions) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, PositionImpl position) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType ackType) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType ackType) { + return CompletableFuture.completedFuture(null); + } + }); + } + + @Override + public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) { + return CompletableFuture.completedFuture(true); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index 4b7762a2acf..13c4d7d72af 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.mockito.Mockito.mock; import io.netty.channel.EventLoopGroup; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -39,6 +40,8 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -89,6 +92,20 @@ class NonStartableTestPulsarService extends AbstractTestPulsarService { } catch (PulsarServerException e) { throw new RuntimeException(e); } + if (config.isTransactionCoordinatorEnabled()) { + try { + setTransactionBufferProvider(TransactionBufferProvider + .newProvider(config.getTransactionBufferProviderClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider + .newProvider(config.getTransactionPendingAckStoreProviderClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } @Override
