This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ddd9bc2fdc7238739dca043021d94c6ce3044edf
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 28 14:45:01 2023 +0800

    [improve] [broker] Improve cache handling for partitioned topic metadata 
when doing lookup (#21063)
    
    Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the 
flow of the create topic progress is like the below:
    1. `Client-side`: Lookup topic to get partitioned topic metadata to create 
a producer.
    1. `Broker-side`: Create partitioned topic metadata.
    1. `Broker-side`: response `{"partitions":3}`.
    1. `Client-side`: Create separate connections for each partition of the 
topic.
    1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics.
    
    In the `step 2` above, the flow of the progress is like the below:
    1. Check the policy of topic auto-creation( the policy is 
`{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` )
    1. Check the partitioned topic metadata already exists.
    1. Try to create the partitioned topic metadata if it does not exist.
    1. If created failed by the partitioned topic metadata already exists( 
maybe another broker is also creating now), read partitioned topic metadata 
from the metadata store and respond to the client.
    
    There is a race condition that makes the client get non-partitioned 
metadata of the topic:
    | time | `broker-1` | `broker-2` |
    | --- | --- | --- |
    | 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` |
    | 2 | check the partitioned topic metadata already exists | Check the 
partitioned topic metadata already exists |
    | 3 | Partitioned topic metadata does not exist, the metadata cache will 
cache an empty optional for the path | Partitioned topic metadata does not 
exist, the metadata cache will cache an empty optional for the path |
    | 4 |  | succeed create the partitioned topic metadata |
    | 5 | Receive a ZK node changed event to invalidate the cache of the 
partitioned topic metadata |
    | 6 | Creating the metadata failed due to it already exists |
    | 7 | Read the partitioned topic metadata again |
    
    If `step-5` is executed later than `step-7`, `broker-1` will get an empty 
optional from the cache of the partitioned topic metadata and respond 
non-partitioned metadata to the client.
    
    **What thing would make the `step-5` is executed later than `step-7`?**
    Provide a scenario: Such as the issue that the PR 
https://github.com/apache/pulsar/pull/20303 fixed, it makes `zk operation` and 
`zk node changed notifications`  executed in different threads: `main-thread of 
ZK client` and `metadata store thread`.
    
    Therefore, the mechanism of the lookup partitioned topic metadata is 
fragile and we need to optimize it.
    
    Modifications: Before reading the partitioned topic metadata again, refresh 
the cache first.
    (cherry picked from commit d099ac4fa2f217b9c5f0a5e660c83048e829c5d7)
---
 .../pulsar/broker/service/BrokerService.java       |  15 ++-
 .../broker/service/BrokerServiceChaosTest.java     | 103 +++++++++++++++++++++
 2 files changed, 116 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 28ef4d35ec8..08550886ecb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2738,10 +2738,13 @@ public class BrokerService implements Closeable {
                                                 
.createDefaultPartitionedTopicAsync(topicName, policies)
                                                 .thenAccept(md -> 
future.complete(md))
                                                 .exceptionally(ex -> {
+                                                    log.info("[{}] The 
partitioned topic is already"
+                                                            + " created, try 
to refresh the cache and read"
+                                                            + " again.", 
topicName);
                                                     if (ex.getCause()
                                                             instanceof 
MetadataStoreException.AlreadyExistsException) {
                                                         // The partitioned 
topic might be created concurrently
-                                                        
fetchPartitionedTopicMetadataAsync(topicName)
+                                                        
fetchPartitionedTopicMetadataAsync(topicName, true)
                                                                 
.whenComplete((metadata2, ex2) -> {
                                                                     if (ex2 == 
null) {
                                                                         
future.complete(metadata2);
@@ -2750,6 +2753,9 @@ public class BrokerService implements Closeable {
                                                                     }
                                                                 });
                                                     } else {
+                                                        log.error("[{}] 
operation of creating partitioned"
+                                                                        + " 
topic metadata failed",
+                                                                topicName, ex);
                                                         
future.completeExceptionally(ex);
                                                     }
                                                     return null;
@@ -2789,9 +2795,14 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<PartitionedTopicMetadata> 
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
+        return fetchPartitionedTopicMetadataAsync(topicName, false);
+    }
+
+    public CompletableFuture<PartitionedTopicMetadata> 
fetchPartitionedTopicMetadataAsync(TopicName topicName,
+                                                                               
       boolean refreshCacheAndGet) {
         // gets the number of partitions from the configuration cache
         return 
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                
.getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> {
+                .getPartitionedTopicMetadataAsync(topicName, 
refreshCacheAndGet).thenApply(metadata -> {
                     // if the partitioned topic is not found in metadata, then 
the topic is not partitioned
                     return metadata.orElseGet(() -> new 
PartitionedTopicMetadata());
                 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
new file mode 100644
index 00000000000..614b4f08370
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TopicType;
+import 
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class BrokerServiceChaosTest extends 
CanReconnectZKClientPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Test
+    public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws 
Exception {
+        final String configMetadataStoreConnectString =
+                
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), 
"zkConnectString");
+        final ZooKeeper anotherZKCli = new 
ZooKeeper(configMetadataStoreConnectString, 5000, null);
+        // Set policy of auto create topic to PARTITIONED.
+        final String ns = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replaceAll("-", "");
+        final TopicName topicName1 = TopicName.get("persistent://" + ns + 
"/tp1");
+        final TopicName topicName2 = TopicName.get("persistent://" + ns + 
"/tp2");
+        admin.namespaces().createNamespace(ns);
+        AutoTopicCreationOverride autoTopicCreationOverride =
+                new 
AutoTopicCreationOverrideImpl.AutoTopicCreationOverrideImplBuilder().allowAutoTopicCreation(true)
+                                .topicType(TopicType.PARTITIONED.toString())
+                                .defaultNumPartitions(3).build();
+        admin.namespaces().setAutoTopicCreationAsync(ns, 
autoTopicCreationOverride);
+        // Make the cache of namespace policy is valid.
+        admin.namespaces().getAutoSubscriptionCreation(ns);
+        // Trigger the zk node 
"/admin/partitioned-topics/{namespace}/persistent" created.
+        admin.topics().createPartitionedTopic(topicName1.toString(), 2);
+        admin.topics().deletePartitionedTopic(topicName1.toString());
+
+        // Since there is no partitioned metadata created, the partitions 
count of metadata will be 0.
+        PartitionedTopicMetadata partitionedTopicMetadata1 =
+                
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get();
+        assertEquals(partitionedTopicMetadata1.partitions, 0);
+
+        // Create the partitioned metadata by another zk client.
+        // Make a error to make the cache could not update.
+        makeLocalMetadataStoreKeepReconnect();
+        anotherZKCli.create("/admin/partitioned-topics/" + ns + "/persistent/" 
+ topicName2.getLocalName(),
+                "{\"partitions\":3}".getBytes(StandardCharsets.UTF_8),
+                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        stopLocalMetadataStoreAlwaysReconnect();
+
+        // Get the partitioned metadata from cache, there is 90% chance that 
partitions count of metadata is 0.
+        PartitionedTopicMetadata partitionedTopicMetadata2 =
+                
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2).get();
+        // Note: If you want to reproduce the issue, you can perform 
validation on the next line.
+        // assertEquals(partitionedTopicMetadata2.partitions, 0);
+
+        // Verify the new method will return a correct result.
+        PartitionedTopicMetadata partitionedTopicMetadata3 =
+                
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, 
true).get();
+        assertEquals(partitionedTopicMetadata3.partitions, 3);
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topicName2.toString());
+        anotherZKCli.close();
+    }
+}

Reply via email to