This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9e7df91d49f [improve][broker] Add broker filter sync method back to 
guarantee the API compatibility (#20826)
9e7df91d49f is described below

commit 9e7df91d49fe9a097c64f52d79f436d7676aae17
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jul 24 10:43:07 2023 +0800

    [improve][broker] Add broker filter sync method back to guarantee the API 
compatibility (#20826)
    
    (cherry picked from commit 69d7a2bf14555f11a716a9545c5cf391d8179a27)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  2 +-
 .../filter/AntiAffinityGroupPolicyFilter.java      |  2 +-
 .../extensions/filter/BrokerFilter.java            | 22 +++++++++++++++++---
 .../filter/BrokerIsolationPoliciesFilter.java      |  6 +++---
 .../filter/BrokerLoadManagerClassFilter.java       |  2 +-
 .../filter/BrokerMaxTopicCountFilter.java          |  6 +++---
 .../extensions/filter/BrokerVersionFilter.java     |  6 +++---
 .../extensions/scheduler/TransferShedder.java      |  2 +-
 .../AntiAffinityNamespaceGroupExtensionTest.java   |  4 ++--
 .../extensions/ExtensibleLoadManagerImplTest.java  | 24 +++++++++++-----------
 .../filter/BrokerIsolationPoliciesFilterTest.java  | 20 +++++++++---------
 .../filter/BrokerLoadManagerClassFilterTest.java   |  4 ++--
 .../filter/BrokerMaxTopicCountFilterTest.java      |  2 +-
 .../extensions/filter/BrokerVersionFilterTest.java | 12 +++++------
 .../extensions/scheduler/TransferShedderTest.java  |  6 +++---
 15 files changed, 68 insertions(+), 52 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index f3c5e5dc2ee..cba499eb8ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -493,7 +493,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                             new ArrayList<>(filterPipeline.size());
                     for (final BrokerFilter filter : filterPipeline) {
                         CompletableFuture<Map<String, BrokerLookupData>> 
future =
-                                filter.filter(availableBrokerCandidates, 
bundle, context);
+                                filter.filterAsync(availableBrokerCandidates, 
bundle, context);
                         futures.add(future);
                     }
                     CompletableFuture<Optional<String>> result = new 
CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
index df08ba96b9d..37b35d8661d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
@@ -39,7 +39,7 @@ public class AntiAffinityGroupPolicyFilter implements 
BrokerFilter {
     }
 
     @Override
-    public CompletableFuture<Map<String, BrokerLookupData>> filter(
+    public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
             Map<String, BrokerLookupData> brokers, ServiceUnitId 
serviceUnitId, LoadManagerContext context) {
         return helper.filterAsync(brokers, serviceUnitId.toString());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 046cae48914..2950a013389 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -20,6 +20,7 @@ package 
org.apache.pulsar.broker.loadbalance.extensions.filter;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -42,8 +43,23 @@ public interface BrokerFilter {
      * @param context The load manager context.
      * @return Filtered broker list.
      */
-    CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, 
BrokerLookupData> brokers,
-                                                            ServiceUnitId 
serviceUnit,
-                                                            LoadManagerContext 
context);
+    @Deprecated
+    default Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers,
+                                                 ServiceUnitId serviceUnit,
+                                                 LoadManagerContext context) 
throws BrokerFilterException {
+        return filterAsync(brokers, serviceUnit, context).join();
+    }
+
+    /**
+     * Filter out async unqualified brokers based on implementation.
+     *
+     * @param brokers The full broker and lookup data.
+     * @param serviceUnit The current serviceUnit.
+     * @param context The load manager context.
+     * @return Filtered broker list.
+     */
+    CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, 
BrokerLookupData> brokers,
+                                                                 ServiceUnitId 
serviceUnit,
+                                                                 
LoadManagerContext context);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
index 0aa1dda437a..306c4c36f48 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
@@ -44,9 +44,9 @@ public class BrokerIsolationPoliciesFilter implements 
BrokerFilter {
     }
 
     @Override
-    public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, 
BrokerLookupData> availableBrokers,
-                                    ServiceUnitId serviceUnit,
-                                    LoadManagerContext context) {
+    public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> availableBrokers,
+                                                                        
ServiceUnitId serviceUnit,
+                                                                        
LoadManagerContext context) {
         return 
isolationPoliciesHelper.applyIsolationPoliciesAsync(availableBrokers, 
serviceUnit)
                 .thenApply(brokerCandidateCache -> {
                     availableBrokers.keySet().retainAll(brokerCandidateCache);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
index 6504da047a0..54d2a555e61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
@@ -34,7 +34,7 @@ public class BrokerLoadManagerClassFilter implements 
BrokerFilter {
     }
 
     @Override
-    public CompletableFuture<Map<String, BrokerLookupData>> filter(
+    public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
             Map<String, BrokerLookupData> brokers,
             ServiceUnitId serviceUnit,
             LoadManagerContext context) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 044dcc83a78..472cabf1566 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -36,9 +36,9 @@ public class BrokerMaxTopicCountFilter implements 
BrokerFilter {
     }
 
     @Override
-    public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, 
BrokerLookupData> brokers,
-                                                                   
ServiceUnitId serviceUnit,
-                                                                   
LoadManagerContext context) {
+    public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                        
ServiceUnitId serviceUnit,
+                                                                        
LoadManagerContext context) {
         int loadBalancerBrokerMaxTopics = 
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
         brokers.keySet().removeIf(broker -> {
             Optional<BrokerLoadData> brokerLoadDataOpt = 
context.brokerLoadDataStore().get(broker);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
index 6e8ce5e58f2..1af39a6adb5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -47,9 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
      *
      */
     @Override
-    public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, 
BrokerLookupData> brokers,
-                                                                   
ServiceUnitId serviceUnit,
-                                                                   
LoadManagerContext context) {
+    public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                        
ServiceUnitId serviceUnit,
+                                                                        
LoadManagerContext context) {
         ServiceConfiguration conf = context.brokerConfiguration();
         if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
             return CompletableFuture.completedFuture(brokers);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7bb16bac124..cd5c17febc2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -719,7 +719,7 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
         Map<String, BrokerLookupData> candidates = new 
HashMap<>(availableBrokers);
         for (var filter : brokerFilterPipeline) {
             try {
-                filter.filter(candidates, namespaceBundle, context)
+                filter.filterAsync(candidates, namespaceBundle, context)
                         
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(),
                                 TimeUnit.SECONDS);
             } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index 8a73a5fa500..2293ecd2c24 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -130,7 +130,7 @@ public class AntiAffinityNamespaceGroupExtensionTest 
extends AntiAffinityNamespa
         doReturn(namespace + "/" + bundle).when(namespaceBundle).toString();
 
         var expected = new HashMap<>(brokers);
-        var actual = antiAffinityGroupPolicyFilter.filter(
+        var actual = antiAffinityGroupPolicyFilter.filterAsync(
                 brokers, namespaceBundle, context).get();
         assertEquals(actual, expected);
 
@@ -141,7 +141,7 @@ public class AntiAffinityNamespaceGroupExtensionTest 
extends AntiAffinityNamespa
         var srcBroker = 
serviceUnitStateChannel.getOwnerAsync(namespaceBundle.toString())
                 .get(5, TimeUnit.SECONDS).get();
         expected.remove(srcBroker);
-        actual = antiAffinityGroupPolicyFilter.filter(
+        actual = antiAffinityGroupPolicyFilter.filterAsync(
                 brokers, namespaceBundle, context).get();
         assertEquals(actual, expected);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index cff92ab27c6..410c10eeac7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -267,9 +267,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             }
 
             @Override
-            public CompletableFuture<Map<String, BrokerLookupData>> 
filter(Map<String, BrokerLookupData> brokers,
-                                                                           
ServiceUnitId serviceUnit,
-                                                                           
LoadManagerContext context) {
+            public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                               
 ServiceUnitId serviceUnit,
+                                                                               
 LoadManagerContext context) {
                 brokers.remove(pulsar1.getLookupServiceAddress());
                 return CompletableFuture.completedFuture(brokers);
             }
@@ -288,9 +288,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
         doReturn(List.of(new MockBrokerFilter() {
             @Override
-            public CompletableFuture<Map<String, BrokerLookupData>> 
filter(Map<String, BrokerLookupData> brokers,
-                                                                           
ServiceUnitId serviceUnit,
-                                                                           
LoadManagerContext context) {
+            public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                               
 ServiceUnitId serviceUnit,
+                                                                               
 LoadManagerContext context) {
                 brokers.remove(brokers.keySet().iterator().next());
                 return FutureUtil.failedFuture(new 
BrokerFilterException("Test"));
             }
@@ -513,17 +513,17 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
         doReturn(List.of(new MockBrokerFilter() {
             @Override
-            public CompletableFuture<Map<String, BrokerLookupData>> 
filter(Map<String, BrokerLookupData> brokers,
-                                                                           
ServiceUnitId serviceUnit,
-                                                                           
LoadManagerContext context) {
+            public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                               
 ServiceUnitId serviceUnit,
+                                                                               
 LoadManagerContext context) {
                 brokers.remove(lookupServiceAddress1);
                 return CompletableFuture.completedFuture(brokers);
             }
         },new MockBrokerFilter() {
             @Override
-            public CompletableFuture<Map<String, BrokerLookupData>> 
filter(Map<String, BrokerLookupData> brokers,
-                                                                           
ServiceUnitId serviceUnit,
-                                                                           
LoadManagerContext context) {
+            public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                               
 ServiceUnitId serviceUnit,
+                                                                               
 LoadManagerContext context) {
                 return FutureUtil.failedFuture(new 
BrokerFilterException("Test"));
             }
         })).when(primaryLoadManager).getBrokerFilterPipeline();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index b8d5d98fd4c..f45e1405e1d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -83,20 +83,20 @@ public class BrokerIsolationPoliciesFilterTest {
         BrokerIsolationPoliciesFilter filter = new 
BrokerIsolationPoliciesFilter(isolationPoliciesHelper);
 
         // a. available-brokers: broker1, broker2, broker3 => result: broker1
-        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(Map.of(
+        Map<String, BrokerLookupData> result = filter.filterAsync(new 
HashMap<>(Map.of(
                 "broker1", getLookupData(),
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker1"));
 
         // b. available-brokers: broker2, broker3          => result: broker2
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker2"));
 
         // c. available-brokers: broker3                   => result: NULL
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertTrue(result.isEmpty());
 
@@ -104,20 +104,20 @@ public class BrokerIsolationPoliciesFilterTest {
         setIsolationPolicies(policies, namespaceName, Set.of("broker1"), 
Set.of("broker2"), Set.of("broker3"), 2);
 
         // a. available-brokers: broker1, broker2, broker3 => result: broker1, 
broker2
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker1", getLookupData(),
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker1", "broker2"));
 
         // b. available-brokers: broker2, broker3          => result: broker2
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker2"));
 
         // c. available-brokers: broker3                   => result: NULL
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker3", getLookupData())), namespaceName, 
getContext()).get();
         assertTrue(result.isEmpty());
     }
@@ -141,14 +141,14 @@ public class BrokerIsolationPoliciesFilterTest {
 
 
 
-        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(Map.of(
+        Map<String, BrokerLookupData> result = filter.filterAsync(new 
HashMap<>(Map.of(
                 "broker1", getLookupData(),
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceBundle, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
 
 
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker1", getLookupData(true, false),
                 "broker2", getLookupData(true, false),
                 "broker3", getLookupData())), namespaceBundle, 
getContext()).get();
@@ -156,13 +156,13 @@ public class BrokerIsolationPoliciesFilterTest {
 
         doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
 
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker1", getLookupData(),
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceBundle, 
getContext()).get();
         assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
 
-        result = filter.filter(new HashMap<>(Map.of(
+        result = filter.filterAsync(new HashMap<>(Map.of(
                 "broker1", getLookupData(false, true),
                 "broker2", getLookupData(),
                 "broker3", getLookupData())), namespaceBundle, 
getContext()).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
index 6b35bf71a03..17475b41957 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
@@ -49,14 +49,14 @@ public class BrokerLoadManagerClassFilterTest extends 
BrokerFilterTestBase {
                 "broker5", getLookupData("3.0.0", null)
         );
 
-        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(originalBrokers), null, context).get();
+        Map<String, BrokerLookupData> result = filter.filterAsync(new 
HashMap<>(originalBrokers), null, context).get();
         assertEquals(result, Map.of(
                 "broker1", getLookupData("3.0.0", 
ExtensibleLoadManagerImpl.class.getName()),
                 "broker2", getLookupData("3.0.0", 
ExtensibleLoadManagerImpl.class.getName())
         ));
 
         
context.brokerConfiguration().setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
-        result = filter.filter(new HashMap<>(originalBrokers), null, 
context).get();
+        result = filter.filterAsync(new HashMap<>(originalBrokers), null, 
context).get();
 
         assertEquals(result, Map.of(
                 "broker3", getLookupData("3.0.0", 
ModularLoadManagerImpl.class.getName()),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
index a6214304bd2..9d000cb91a1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
@@ -60,7 +60,7 @@ public class BrokerMaxTopicCountFilterTest extends 
BrokerFilterTestBase {
                 "broker4", getLookupData()
         );
         Map<String, BrokerLookupData> result =
-                filter.filter(new HashMap<>(originalBrokers), null, 
context).get();
+                filter.filterAsync(new HashMap<>(originalBrokers), null, 
context).get();
         assertEquals(result, Map.of(
                 "broker2", getLookupData(),
                 "broker4", getLookupData()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
index 4a1a4543938..2aa7faeb599 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -43,7 +43,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
     @Test
     public void testFilterEmptyBrokerList() throws BrokerFilterException, 
ExecutionException, InterruptedException {
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(), null, getContext()).get();
+        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filterAsync(new HashMap<>(), null, getContext()).get();
         assertTrue(result.isEmpty());
     }
 
@@ -60,7 +60,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
         );
         Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filter(brokers, null, context).get();
+        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filterAsync(brokers, null, context).get();
         assertEquals(result, originalBrokers);
     }
 
@@ -73,7 +73,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6653", getLookupData("2.10.1")
         );
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(
+        Map<String, BrokerLookupData> result = brokerVersionFilter.filterAsync(
                 new HashMap<>(originalBrokers), null, getContext()).get();
         assertEquals(result, Map.of(
                 "localhost:6651", getLookupData("2.10.1"),
@@ -87,7 +87,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6652", getLookupData("2.10.1"),
                 "localhost:6653", getLookupData("2.10.1")
         );
-        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
null, getContext()).get();
+        result = brokerVersionFilter.filterAsync(new 
HashMap<>(originalBrokers), null, getContext()).get();
 
         assertEquals(result, Map.of(
                 "localhost:6652", getLookupData("2.10.1"),
@@ -101,7 +101,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
         );
 
-        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
null, getContext()).get();
+        result = brokerVersionFilter.filterAsync(new 
HashMap<>(originalBrokers), null, getContext()).get();
         assertEquals(result, Map.of(
                 "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
         ));
@@ -115,7 +115,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
         );
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
         try {
-            brokerVersionFilter.filter(new HashMap<>(originalBrokers), null, 
getContext()).get();
+            brokerVersionFilter.filterAsync(new HashMap<>(originalBrokers), 
null, getContext()).get();
             fail();
         } catch (Exception ex) {
             assertEquals(ex.getCause().getClass(), 
BrokerFilterBadVersionException.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 3dd41a92e68..26d95a0158d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -764,9 +764,9 @@ public class TransferShedderTest {
             }
 
             @Override
-            public CompletableFuture<Map<String, BrokerLookupData>> 
filter(Map<String, BrokerLookupData> brokers,
-                                                                           
ServiceUnitId serviceUnit,
-                                                                           
LoadManagerContext context) {
+            public CompletableFuture<Map<String, BrokerLookupData>> 
filterAsync(Map<String, BrokerLookupData> brokers,
+                                                                               
 ServiceUnitId serviceUnit,
+                                                                               
 LoadManagerContext context) {
                 return FutureUtil.failedFuture(new 
BrokerFilterException("test"));
             }
         };

Reply via email to