This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4b592c827e6c4fe1d80f974dbfbbed1311660d5d Author: fengyubiao <[email protected]> AuthorDate: Thu Jun 30 22:33:17 2022 +0800 [fix] [transaction] Cmd-Subscribe and Cmd-Producer will not succeed even after 100 retries (#16248) E.g. `client.lookup(by producer)`, `topic.unload`, `consumer.subscribe` executed at the same time: | Time | `client.lookup(by producer)` | `topic.unload` | `consumer.subscribe` | | ----------- | ----------- | ----------- | ----------- | | 1 | | | `ServerCnx.consumers.putIfAbsent(consumerId, consumerFuture)` | | 2 | get existing persistent topic | | | | 3 | create a new persistent subscription | | | | 4 | create a new pending ack handle | | | | 5 | repaly async | | | | 6 | | | waiting for pending ack log repaly finish | | 6 | | topic.close | | | 7 | | async close subscription | | | 8 | | change pending ack handle state --> `close` | | | 9 | change pending ack handle state `init` --> `ready` | | | | 10 | change state failure | | | | 11 | | | subscribe timeout | | 12 | | | retry subscribe | | 13 | | | get exists `consumerFuture` in `ServerCnx.consumers` | | 14 | | | waiting for pending ack log repaly finish | | 15 | | | subscribe timeout | | 16 | | | ...... (loop step12 ~ step15) | Step 6/14: `PersistentSubscription.addConsumer` will waiting for pending ack replay done. https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L205-L206 step 7: Because `PersistentSubscription.addConsumer` has not finish, no consumers are closed. Step 10: When failure to modify the pending-ack-handle-state will not terminate the `pendingAckHandleFuture` https://github.com/apache/pulsar/blob/ebcc47ee7ceb43f680640ad72e51a06d9856458d/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java#L46-L61 After step 11: Cmd-Subscribe will not succeed even after 100 retries. When failure to modify the pending-ack-handle-state, make `pendingAckHandleFuture` exceptionally complete. (cherry picked from commit ac7331e06e597851a75e544130c0602a780a62cd) --- .../buffer/impl/TopicTransactionBuffer.java | 11 +- .../pendingack/impl/MLPendingAckReplyCallBack.java | 7 +- .../pulsar/broker/transaction/TransactionTest.java | 179 +++++++++++++++++++++ 3 files changed, 193 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 6469ba9fb9f..67b7a24d0df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.transaction.buffer.impl; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -131,7 +132,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } if (!changeToReadyState()) { - log.error("[{}]Transaction buffer recover fail", topic.getName()); + log.error("[{}]Transaction buffer recover fail, current state: {}", + topic.getName(), getState()); + transactionBufferFuture.completeExceptionally + (new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction buffer recover failed to change the status to Ready," + + "current state is: " + getState())); } else { timer.newTimeout(TopicTransactionBuffer.this, takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); @@ -558,7 +564,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen // we store the maxReadPosition from snapshot then open the non-durable cursor by this topic's manageLedger. // the non-durable cursor will read to lastConfirmedEntry. - static class TopicTransactionBufferRecover implements Runnable { + @VisibleForTesting + public static class TopicTransactionBufferRecover implements Runnable { private final PersistentTopic topic; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java index 728e7f0476b..53de549f69f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack; import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata; import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry; @@ -53,8 +54,10 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack { log.info("Topic name : [{}], SubName : [{}] pending ack handle cache request success!", pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); } else { - log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!", - pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); + log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail! current state: {}", + pendingAckHandle.getTopicName(), pendingAckHandle.getSubName(), pendingAckHandle.state); + replayFailed(new BrokerServiceException.ServiceUnitNotReadyException("Failed" + + " to change PendingAckHandle state to Ready, current state is : " + pendingAckHandle.state)); } pendingAckHandle.handleCacheRequest(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 68e73d895e3..599712a4764 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -26,8 +26,10 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -37,6 +39,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.Timeout; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.List; @@ -46,9 +49,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.Bytes; @@ -62,16 +69,27 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.service.BacklogQuotaManager; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TransactionBufferSnapshotService; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; @@ -90,6 +108,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventType; @@ -111,6 +130,8 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -1085,4 +1106,162 @@ public class TransactionTest extends TransactionTestBase { // repeat ack the second message, can ack successful consumer.acknowledgeAsync(messageId, txn3).get(); } + + /** + * When change pending ack handle state failure, exceptionally complete cmd-subscribe. + * see: https://github.com/apache/pulsar/pull/16248. + */ + @Test + public void testPendingAckReplayChangeStateError() throws InterruptedException, TimeoutException { + AtomicInteger atomicInteger = new AtomicInteger(1); + // Create Executor + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + // Mock serviceConfiguration. + ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true); + // Mock executorProvider. + ExecutorProvider executorProvider = mock(ExecutorProvider.class); + when(executorProvider.getExecutor()).thenReturn(executorService); + when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService); + // Mock pendingAckStore. + PendingAckStore pendingAckStore = mock(PendingAckStore.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + executorService.execute(()->{ + PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0]; + pendingAckHandle.close(); + MLPendingAckReplyCallBack mlPendingAckReplyCallBack + = new MLPendingAckReplyCallBack(pendingAckHandle); + mlPendingAckReplyCallBack.replayComplete(); + }); + return null; + } + }).when(pendingAckStore).replayAsync(any(), any()); + // Mock executorProvider. + TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); + when(pendingAckStoreProvider.checkInitializedBefore(any())) + .thenReturn(CompletableFuture.completedFuture(true)); + when(pendingAckStoreProvider.newPendingAckStore(any())) + .thenReturn(CompletableFuture.completedFuture(pendingAckStore)); + // Mock pulsar. + PulsarService pulsar = mock(PulsarService.class); + when(pulsar.getConfig()).thenReturn(serviceConfiguration); + when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider); + when(pulsar.getTransactionPendingAckStoreProvider()).thenReturn(pendingAckStoreProvider); + // Mock brokerService. + BrokerService brokerService = mock(BrokerService.class); + when(brokerService.getPulsar()).thenReturn(pulsar); + // Mock topic. + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getBrokerService()).thenReturn(brokerService); + when(topic.getName()).thenReturn("topic-a"); + // Mock cursor for subscription. + ManagedCursor cursor_subscription = mock(ManagedCursor.class); + doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive(); + // Create subscription. + String subscriptionName = "sub-a"; + boolean replicated = false; + Map<String, String> subscriptionProperties = Collections.emptyMap(); + PersistentSubscription persistentSubscription = new PersistentSubscription(topic, subscriptionName, + cursor_subscription, replicated, subscriptionProperties); + org.apache.pulsar.broker.service.Consumer consumer = mock(org.apache.pulsar.broker.service.Consumer.class); + try { + CompletableFuture<Void> addConsumerFuture = persistentSubscription.addConsumer(consumer); + addConsumerFuture.get(5, TimeUnit.SECONDS); + fail("Expect failure by PendingAckHandle closed, but success"); + } catch (ExecutionException executionException){ + Throwable t = executionException.getCause(); + Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + } + + /** + * When change TB state failure, exceptionally complete cmd-producer. + * see: https://github.com/apache/pulsar/pull/16248. + */ + @Test + public void testTBRecoverChangeStateError() throws InterruptedException, TimeoutException { + final AtomicReference<PersistentTopic> persistentTopic = new AtomicReference<PersistentTopic>(); + AtomicInteger atomicInteger = new AtomicInteger(1); + // Create Executor + ScheduledExecutorService executorService_recover = mock(ScheduledExecutorService.class); + // Mock serviceConfiguration. + ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); + when(serviceConfiguration.isEnableReplicatedSubscriptions()).thenReturn(false); + when(serviceConfiguration.isTransactionCoordinatorEnabled()).thenReturn(true); + // Mock executorProvider. + ExecutorProvider executorProvider = mock(ExecutorProvider.class); + when(executorProvider.getExecutor(any(Object.class))).thenReturn(executorService_recover); + // Mock pendingAckStore. + PendingAckStore pendingAckStore = mock(PendingAckStore.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + new Thread(() -> { + TopicTransactionBuffer.TopicTransactionBufferRecover recover + = (TopicTransactionBuffer.TopicTransactionBufferRecover)invocation.getArguments()[0]; + TopicTransactionBufferRecoverCallBack callBack + = Whitebox.getInternalState(recover, "callBack");; + try { + persistentTopic.get().getTransactionBuffer().closeAsync().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + callBack.recoverComplete(); + }).start(); + return null; + } + }).when(executorService_recover).execute(any()); + // Mock executorProvider. + TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); + when(pendingAckStoreProvider.checkInitializedBefore(any())) + .thenReturn(CompletableFuture.completedFuture(true)); + when(pendingAckStoreProvider.newPendingAckStore(any())) + .thenReturn(CompletableFuture.completedFuture(pendingAckStore)); + // Mock TransactionBufferSnapshotService + TransactionBufferSnapshotService transactionBufferSnapshotService + = mock(TransactionBufferSnapshotService.class); + SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class); + when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(transactionBufferSnapshotService.createWriter(any())) + .thenReturn(CompletableFuture.completedFuture(writer)); + // Mock pulsar. + PulsarService pulsar = mock(PulsarService.class); + PulsarResources pulsarResources = mock(PulsarResources.class); + NamespaceResources namespaceResources = mock(NamespaceResources.class); + when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources); + when(pulsar.getPulsarResources()).thenReturn(pulsarResources); + when(pulsar.getConfiguration()).thenReturn(serviceConfiguration); + when(pulsar.getConfig()).thenReturn(serviceConfiguration); + when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider); + when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService); + TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider(); + when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider); + // Mock BacklogQuotaManager + BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class); + // Mock brokerService. + BrokerService brokerService = mock(BrokerService.class); + when(brokerService.getPulsar()).thenReturn(pulsar); + when(brokerService.pulsar()).thenReturn(pulsar); + when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager); + // Mock managedLedger. + ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class); + ManagedCursorContainer managedCursors = new ManagedCursorContainer(); + when(managedLedger.getCursors()).thenReturn(managedCursors); + PositionImpl position = PositionImpl.EARLIEST; + when(managedLedger.getLastConfirmedEntry()).thenReturn(position); + // Create topic. + persistentTopic.set(new PersistentTopic("topic-a", managedLedger, brokerService)); + try { + // Do check. + persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5, TimeUnit.SECONDS); + fail("Expect failure by TB closed, but it is finished."); + } catch (ExecutionException executionException){ + Throwable t = executionException.getCause(); + Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + } }
