This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 242758d5770 [fix][test] Fix flaky PersistentSubscriptionTest (#20434)
242758d5770 is described below
commit 242758d5770de46e506855ff881472cbc274cedb
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 31 22:57:16 2023 +0300
[fix][test] Fix flaky PersistentSubscriptionTest (#20434)
---
.../persistent/PersistentSubscriptionTest.java | 106 +++++++++++----------
.../testcontext/NonStartableTestPulsarService.java | 17 ++++
2 files changed, 72 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 401f52daa62..87408598889 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -92,7 +92,12 @@ public class PersistentSubscriptionTest {
public void setup() throws Exception {
pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
.spyByDefault()
- .configCustomizer(config ->
config.setTransactionCoordinatorEnabled(true))
+ .configCustomizer(config -> {
+ config.setTransactionCoordinatorEnabled(true);
+ config.setTransactionPendingAckStoreProviderClassName(
+
CustomTransactionPendingAckStoreProvider.class.getName());
+
config.setTransactionBufferProviderClassName(InMemTransactionBufferProvider.class.getName());
+ })
.useTestPulsarResources()
.build();
@@ -100,56 +105,6 @@ public class PersistentSubscriptionTest {
doReturn(Optional.of(new Policies())).when(namespaceResources)
.getPoliciesIfCached(any());
- doReturn(new
InMemTransactionBufferProvider()).when(pulsarTestContext.getPulsarService())
- .getTransactionBufferProvider();
- doReturn(new TransactionPendingAckStoreProvider() {
- @Override
- public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
- return CompletableFuture.completedFuture(new PendingAckStore()
{
- @Override
- public void replayAsync(PendingAckHandleImpl
pendingAckHandle, ExecutorService executorService) {
- try {
- Field field =
PendingAckHandleState.class.getDeclaredField("state");
- field.setAccessible(true);
- field.set(pendingAckHandle,
PendingAckHandleState.State.Ready);
- } catch (NoSuchFieldException | IllegalAccessException
e) {
- fail();
- }
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> appendIndividualAck(TxnID
txnID, List<MutablePair<PositionImpl, Integer>> positions) {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> appendCumulativeAck(TxnID
txnID, PositionImpl position) {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> appendCommitMark(TxnID
txnID, AckType ackType) {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> appendAbortMark(TxnID
txnID, AckType ackType) {
- return CompletableFuture.completedFuture(null);
- }
- });
- }
-
- @Override
- public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
- return CompletableFuture.completedFuture(true);
- }
-
}).when(pulsarTestContext.getPulsarService()).getTransactionPendingAckStoreProvider();
-
ledgerMock = mock(ManagedLedgerImpl.class);
cursorMock = mock(ManagedCursorImpl.class);
managedLedgerConfigMock = mock(ManagedLedgerConfig.class);
@@ -279,4 +234,53 @@ public class PersistentSubscriptionTest {
// `acknowledgeMessage` should update cursor last active
assertTrue(persistentSubscription.cursor.getLastActive() >
beforeAcknowledgeTimestamp);
}
+
+ public static class CustomTransactionPendingAckStoreProvider implements
TransactionPendingAckStoreProvider {
+ @Override
+ public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
+ return CompletableFuture.completedFuture(new PendingAckStore() {
+ @Override
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ExecutorService executorService) {
+ try {
+ Field field =
PendingAckHandleState.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(pendingAckHandle,
PendingAckHandleState.State.Ready);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ fail();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendIndividualAck(TxnID txnID,
+
List<MutablePair<PositionImpl, Integer>> positions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendCumulativeAck(TxnID
txnID, PositionImpl position) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendCommitMark(TxnID txnID,
AckType ackType) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendAbortMark(TxnID txnID,
AckType ackType) {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
+ return CompletableFuture.completedFuture(true);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 4b7762a2acf..13c4d7d72af 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.testcontext;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.mock;
import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -39,6 +40,8 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -89,6 +92,20 @@ class NonStartableTestPulsarService extends
AbstractTestPulsarService {
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
+ if (config.isTransactionCoordinatorEnabled()) {
+ try {
+ setTransactionBufferProvider(TransactionBufferProvider
+
.newProvider(config.getTransactionBufferProviderClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+
setTransactionPendingAckStoreProvider(TransactionPendingAckStoreProvider
+
.newProvider(config.getTransactionPendingAckStoreProviderClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override