This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c4c293c405d69074513a8b3d20af512caaca59c0
Author: fengyubiao <[email protected]>
AuthorDate: Thu May 30 16:42:26 2024 +0800

    [improve] [client] improve the class GetTopicsResult (#22766)
    
    (cherry picked from commit 87a33399873ff1e9723a6ca3812cbf914d8c8eef)
---
 .../pulsar/client/impl/LookupServiceTest.java      | 128 +++++++++++++++++++++
 .../client/impl/BinaryProtoLookupService.java      |  14 +--
 .../pulsar/client/impl/HttpLookupService.java      |  13 +--
 .../pulsar/common/lookup/GetTopicsResult.java      | 106 +++++++++++++++--
 4 files changed, 225 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
new file mode 100644
index 00000000000..59cb7ae03d0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+@Slf4j
+public class LookupServiceTest extends ProducerConsumerBase {
+
+    private PulsarClientImpl clientWithHttpLookup;
+    private PulsarClientImpl clientWitBinaryLookup;
+
+    private boolean enableBrokerSideSubscriptionPatternEvaluation = true;
+    private int subscriptionPatternMaxLength = 10_000;
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        clientWithHttpLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+        clientWitBinaryLookup =
+                (PulsarClientImpl) 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (clientWithHttpLookup != null) {
+            clientWithHttpLookup.close();
+        }
+        if (clientWitBinaryLookup != null) {
+            clientWitBinaryLookup.close();
+        }
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        
conf.setEnableBrokerSideSubscriptionPatternEvaluation(enableBrokerSideSubscriptionPatternEvaluation);
+        conf.setSubscriptionPatternMaxLength(subscriptionPatternMaxLength);
+    }
+
+    private LookupService getLookupService(boolean isUsingHttpLookup) {
+        if (isUsingHttpLookup) {
+            return clientWithHttpLookup.getLookup();
+        } else {
+            return clientWitBinaryLookup.getLookup();
+        }
+    }
+
+    @DataProvider(name = "isUsingHttpLookup")
+    public Object[][] isUsingHttpLookup() {
+        return new Object[][]{
+            {true},
+            {false}
+        };
+    }
+
+    @Test(dataProvider = "isUsingHttpLookup")
+    public void testGetTopicsOfGetTopicsResult(boolean isUsingHttpLookup) 
throws Exception {
+        LookupService lookupService = getLookupService(isUsingHttpLookup);
+        String nonPartitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+        String partitionedTopic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+        String nonPersistentTopic = 
BrokerTestUtil.newUniqueName("non-persistent://public/default/tp");
+
+        // Verify the new method "GetTopicsResult.getTopics" works as expected.
+        Collection<String> topics = 
lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"),
+                Mode.PERSISTENT, "public/default/.*", null).join().getTopics();
+        assertTrue(topics.contains(nonPartitionedTopic));
+        assertTrue(topics.contains(partitionedTopic));
+        assertFalse(topics.contains(nonPersistentTopic));
+        
assertFalse(topics.contains(TopicName.get(partitionedTopic).getPartition(0).toString()));
+        // Verify the new method 
"GetTopicsResult.nonPartitionedOrPartitionTopics" works as expected.
+        Collection<String> nonPartitionedOrPartitionTopics =
+                
lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"),
+                Mode.PERSISTENT, "public/default/.*", null).join()
+                .getNonPartitionedOrPartitionTopics();
+        
assertTrue(nonPartitionedOrPartitionTopics.contains(nonPartitionedTopic));
+        
assertFalse(nonPartitionedOrPartitionTopics.contains(partitionedTopic));
+        
assertFalse(nonPartitionedOrPartitionTopics.contains(nonPersistentTopic));
+        
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(0)
+                .toString()));
+        
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(1)
+                .toString()));
+        
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(2)
+                .toString()));
+
+        // Cleanup.
+        admin.topics().deletePartitionedTopic(partitionedTopic, false);
+        admin.topics().delete(nonPartitionedTopic, false);
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 510bf8a244a..19ce53ed066 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -22,8 +22,6 @@ import static java.lang.String.format;
 import io.netty.buffer.ByteBuf;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -341,17 +339,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                         log.debug("[namespace: {}] Success get topics list in 
request: {}",
                                 namespace, requestId);
                     }
-                    // do not keep partition part of topic name
-                    List<String> result = new ArrayList<>();
-                    r.getTopics().forEach(topic -> {
-                        String filtered = 
TopicName.get(topic).getPartitionedTopicName();
-                        if (!result.contains(filtered)) {
-                            result.add(filtered);
-                        }
-                    });
-
-                    getTopicsResultFuture.complete(new GetTopicsResult(result, 
r.getTopicsHash(),
-                            r.isFiltered(), r.isChanged()));
+                    getTopicsResultFuture.complete(r);
                 }
                 client.getCnxPool().releaseConnection(clientCnx);
             });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 02d0d10626f..133e60015a7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -23,10 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
@@ -137,15 +134,7 @@ public class HttpLookupService implements LookupService {
         httpClient
             .get(String.format(format, namespace, mode.toString()), 
String[].class)
             .thenAccept(topics -> {
-                List<String> result = new ArrayList<>();
-                // do not keep partition part of topic name
-                Arrays.asList(topics).forEach(topic -> {
-                    String filtered = 
TopicName.get(topic).getPartitionedTopicName();
-                    if (!result.contains(filtered)) {
-                        result.add(filtered);
-                    }
-                });
-                future.complete(new GetTopicsResult(result, topicsHash, false, 
true));
+                future.complete(new GetTopicsResult(topics));
             }).exceptionally(ex -> {
                 Throwable cause = FutureUtil.unwrapCompletionException(ex);
                 log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", 
namespace, cause.getMessage());
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
index 55fe6253ff9..80f16e6c367 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java
@@ -18,21 +18,105 @@
  */
 package org.apache.pulsar.common.lookup;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
 import lombok.ToString;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.topics.TopicList;
 
-@Getter
-@Setter
-@AllArgsConstructor
-@NoArgsConstructor
+/***
+ * A value object.
+ * - The response of HTTP API "admin/v2/namespaces/{domain}/topics" is a 
topic(non-partitioned topic or partitions)
+ *   array. It will be wrapped to "topics: {topic array}, topicsHash: null, 
filtered: false, changed: true".
+ * - The response of binary API {@link CommandGetTopicsOfNamespace} is a 
{@link CommandGetTopicsOfNamespaceResponse},
+ *   it will be transferred to a {@link GetTopicsResult}.
+ * See more details https://github.com/apache/pulsar/pull/14804.
+ */
 @ToString
 public class GetTopicsResult {
-    private List<String> topics;
-    private String topicsHash;
-    private boolean filtered;
-    private boolean changed;
+
+    /**
+     * Non-partitioned topics, and topic partitions of partitioned topics.
+     */
+    @Getter
+    private final List<String> nonPartitionedOrPartitionTopics;
+
+    /**
+     * The topics have been filtered by Broker using a regexp. Otherwise, the 
client should do a client-side filter.
+     * There are three cases that brokers will not filter the topics:
+     * 1. the lookup service is typed HTTP lookup service, the HTTP API has 
not implemented this feature yet.
+     * 2. the broker does not support this feature(in other words, its version 
is lower than "2.11.0").
+     * 3. the input param "topicPattern" is too long than the broker config 
"subscriptionPatternMaxLength".
+     */
+    @Getter
+    private final boolean filtered;
+
+    /**
+     * The topics hash that was calculated by {@link 
TopicList#calculateHash(List)}. The param topics that will be used
+     * to calculate the hash code is only contains the topics that has been 
filtered.
+     * Note: It is always "null" if broker did not filter the topics when 
calling the API
+     * "LookupService.getTopicsUnderNamespace"(in other words, {@link 
#filtered} is false).
+     */
+    @Getter
+    private final String topicsHash;
+
+    /**
+     * The topics hash has changed after compare with the input param 
"topicsHash" when calling
+     * "LookupService.getTopicsUnderNamespace".
+     * Note: It is always set "true" if the input param "topicsHash" that used 
to call
+     * "LookupService.getTopicsUnderNamespace" is null or the "LookupService" 
is "HttpLookupService".
+     */
+    @Getter
+    private final boolean changed;
+
+    /**
+     * Partitioned topics and non-partitioned topics.
+     * In other words, there is no topic partitions of partitioned topics in 
this list.
+     * Note: it is not a field of the response of 
"LookupService.getTopicsUnderNamespace", it is generated in
+     * client-side memory.
+     */
+    private volatile List<String> topics;
+
+    /**
+     * This constructor is used for binary API.
+     */
+    public GetTopicsResult(List<String> nonPartitionedOrPartitionTopics, 
String topicsHash, boolean filtered,
+                           boolean changed) {
+        this.nonPartitionedOrPartitionTopics = nonPartitionedOrPartitionTopics;
+        this.topicsHash = topicsHash;
+        this.filtered = filtered;
+        this.changed = changed;
+    }
+
+    /**
+     * This constructor is used for HTTP API.
+     */
+    public GetTopicsResult(String[] nonPartitionedOrPartitionTopics) {
+        this(Arrays.asList(nonPartitionedOrPartitionTopics), null, false, 
true);
+    }
+
+    public List<String> getTopics() {
+        if (topics != null) {
+            return topics;
+        }
+        synchronized (this) {
+            if (topics != null) {
+                return topics;
+            }
+            // Group partitioned topics.
+            List<String> grouped = new ArrayList<>();
+            for (String topic : nonPartitionedOrPartitionTopics) {
+                String partitionedTopic = 
TopicName.get(topic).getPartitionedTopicName();
+                if (!grouped.contains(partitionedTopic)) {
+                    grouped.add(partitionedTopic);
+                }
+            }
+            topics = grouped;
+            return topics;
+        }
+    }
 }

Reply via email to