This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 20c2b5bd653 [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232) 20c2b5bd653 is described below commit 20c2b5bd653c9c738279f2bd41a7eaa2da797b8f Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Sun May 7 14:21:51 2023 +0800 [fix] [broker] Fix infinite ack of Replicator after topic is closed (#20232) (cherry picked from commit 98413642995eb6e562f6a591dcf56e20ac0cc7ef) --- .../service/persistent/PersistentReplicator.java | 7 ++++ .../pulsar/broker/service/ReplicatorTest.java | 45 +++++++++++++++------- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index a556237f434..d882cbf56b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -545,6 +545,13 @@ public abstract class PersistentReplicator extends AbstractReplicator public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx, exception.getMessage(), exception); + if (exception instanceof CursorAlreadyClosedException) { + log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); + // replicator is already deleted and cursor is already closed so, producer should also be stopped + closeProducerAsync(); + return; + } if (ctx instanceof PositionImpl) { PositionImpl deletedEntry = (PositionImpl) ctx; if (deletedEntry.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 901451c022b..176eab0e94b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; @@ -51,8 +52,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -60,7 +63,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.State; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -1645,20 +1647,41 @@ public class ReplicatorTest extends ReplicatorTestBase { log.info("--- Starting ReplicatorTest::testReplication ---"); String namespace = "pulsar/global/ns2"; - admin1.namespaces().createNamespace(namespace); - admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1")); final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); - log.info("--- Starting producer --- " + url1); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false) + .getNow(null).get(); + final ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); + final ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor("pulsar.repl.r2"); + final ManagedCursorImpl spyCursor = spy(cursor); + managedLedger.getCursors().removeCursor(cursor.getName()); + managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST); + AtomicBoolean isMakeAckFail = new AtomicBoolean(false); + doAnswer(invocation -> { + Position pos = (Position) invocation.getArguments()[0]; + AsyncCallbacks.DeleteCallback cb = (AsyncCallbacks.DeleteCallback) invocation.getArguments()[1]; + Object ctx = invocation.getArguments()[2]; + if (isMakeAckFail.get()) { + log.info("async-delete {} will be failed", pos); + cb.deleteFailed(new ManagedLedgerException("mocked error"), ctx); + } else { + log.info("async-delete {} will success", pos); + cursor.asyncDelete(pos, cb, ctx); + } + return null; + }).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class), + Mockito.any()); + + log.info("--- Starting producer --- " + url1); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); // Produce from cluster1 and consume from the rest producer1.produce(2); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString(), false) - .getNow(null).get(); MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators(); @@ -1667,25 +1690,19 @@ public class ReplicatorTest extends ReplicatorTestBase { Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started, replicator.getState())); - assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started); - ManagedCursorImpl cursor = (ManagedCursorImpl) replicator.getCursor(); // Make sure all the data has replicated to the remote cluster before close the cursor. Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition)); - cursor.setState(State.Closed); - - Field field = ManagedCursorImpl.class.getDeclaredField("state"); - field.setAccessible(true); - field.set(cursor, State.Closed); + isMakeAckFail.set(true); producer1.produce(10); // The cursor is closed, so the mark delete position will not move forward. assertEquals(cursor.getMarkDeletedPosition(), lastPosition); - field.set(cursor, State.Open); + isMakeAckFail.set(false); Awaitility.await().timeout(30, TimeUnit.SECONDS).until( () -> {