This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 61d800ea3c4195abcb6aa0d30f1763263a63d84c Author: Lari Hotari <[email protected]> AuthorDate: Fri Feb 12 23:08:29 2021 +0200 [Broker] Fix race condition in BrokerService topic cache (#9565) * Fix race condition in BrokerService topic cache * Add test that reproduces the topic cache race condition * Use logger for logging exception * Address review comments - remove timeout - show thread info in log statement - add index of thread as part of thread's name * Reset state before running BrokerServiceTests that check stats (cherry picked from commit cee6377de13180805a122842816291b169e1aea2) --- .../pulsar/broker/service/BrokerService.java | 19 ++++++- .../pulsar/broker/service/BrokerServiceTest.java | 65 ++++++++++++++++++---- 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 72d6b50..f2c9e4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -756,7 +756,24 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies // Exceptional topics should be recreated. topics.remove(topic, topicFuture); } else { - return topicFuture; + // a non-existing topic in the cache shouldn't prevent creating a topic + if (createIfMissing) { + if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) { + return topicFuture; + } else { + return topicFuture.thenCompose(value -> { + if (!value.isPresent()) { + // retry and create topic + return getTopic(topic, createIfMissing); + } else { + // in-progress future completed successfully + return CompletableFuture.completedFuture(value); + } + }); + } + } else { + return topicFuture; + } } } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 9cbc624..0af9b1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -24,11 +24,14 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -49,7 +52,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; - +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -75,23 +79,17 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; -import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; - -import lombok.Cleanup; - /** */ +@Slf4j public class BrokerServiceTest extends BrokerTestBase { private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt"; @@ -111,6 +109,13 @@ public class BrokerServiceTest extends BrokerTestBase { super.internalCleanup(); } + // method for resetting state explicitly + // this is required since setup & cleanup are using BeforeClass & AfterClass + private void resetState() throws Exception { + cleanup(); + setup(); + } + @Test public void testOwnedNsCheck() throws Exception { final String topic = "persistent://prop/ns-abc/successTopic"; @@ -147,6 +152,9 @@ public class BrokerServiceTest extends BrokerTestBase { @Test public void testBrokerServicePersistentTopicStats() throws Exception { + // this test might fail if there are stats from other tests + resetState(); + final String topicName = "persistent://prop/ns-abc/successTopic"; final String subName = "successSub"; @@ -245,6 +253,9 @@ public class BrokerServiceTest extends BrokerTestBase { @Test public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { + // this test might fail if there are stats from other tests + resetState(); + final String topicName = "persistent://prop/ns-abc/successSharedTopic"; final String subName = "successSharedSub"; @@ -382,6 +393,9 @@ public class BrokerServiceTest extends BrokerTestBase { @Test public void testBrokerServiceNamespaceStats() throws Exception { + // this test fails if there is state from other tests + resetState(); + final int numBundles = 4; final String ns1 = "prop/stats1"; final String ns2 = "prop/stats2"; @@ -1001,4 +1015,33 @@ public class BrokerServiceTest extends BrokerTestBase { } Assert.assertTrue(sb.toString().contains("test_metrics")); } + + @Test + public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception { + // run multiple iterations to increase the chance of reproducing a race condition in the topic cache + for (int i = 0; i < 100; i++) { + final String topicName = "persistent://prop/ns-abc/topic-caching-test-topic" + i; + CountDownLatch latch = new CountDownLatch(1); + Thread getStatsThread = new Thread(() -> { + try { + latch.countDown(); + // create race condition with a short delay + // the bug might not reproduce in all environments, this works at least on i7-10750H CPU + Thread.sleep(1); + admin.topics().getStats(topicName); + fail("The topic should not exist yet."); + } catch (PulsarAdminException.NotFoundException e) { + // expected exception + } catch (PulsarAdminException | InterruptedException e) { + log.error("Exception in {}", Thread.currentThread().getName(), e); + } + }, "getStatsThread#" + i); + getStatsThread.start(); + latch.await(); + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + assertNotNull(producer); + getStatsThread.join(); + } + } }
