This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new fc1ebd3c09c [fix][broker]Fix failed consumption after loaded up a terminated topic (#24063) fc1ebd3c09c is described below commit fc1ebd3c09c6b238d827cc9eb147cfc1180c7645 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Mar 13 12:42:45 2025 +0800 [fix][broker]Fix failed consumption after loaded up a terminated topic (#24063) (cherry picked from commit f6631be414559fb284ed67651346a4953903fade) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 + .../client/api/PersistentTopicTerminateTest.java | 98 ++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ad2435756c1..c0884ee70c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -429,6 +429,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc)); } if (rc == BKException.Code.OK) { + if (State.Terminated.equals(state)) { + currentLedger = lh; + } LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id) .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) .setTimestamp(clock.millis()).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java new file mode 100644 index 00000000000..69c012b7622 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +@Slf4j +public class PersistentTopicTerminateTest extends ProducerConsumerBase { + + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testRecoverAfterTerminate() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscriptionName = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + // Trigger 2 ledgers creation. + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("1"); + admin.topics().unload(topicName); + producer.send("2"); + + // Terminate topic. + producer.close(); + admin.topics().terminateTopic(topicName); + admin.topics().unload(topicName); + + // Verify: consume 2 msgs. + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subscriptionName).subscribe(); + + Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + assertEquals(msg1.getValue(), "1"); + Message<String> msg2 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg2); + assertEquals(msg2.getValue(), "2"); + + // Verify: the ledgers acked will be cleaned up. + admin.topics().skipAllMessages(topicName, subscriptionName); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + CompletableFuture<Void> trimLedgersFuture = new CompletableFuture<>(); + ml.trimConsumedLedgersInBackground(trimLedgersFuture); + trimLedgersFuture.join(); + assertTrue(ml.getLedgersInfo().size() <= 1); + }); + + // Cleanup. + consumer.close(); + admin.topics().delete(topicName, false); + } +}