This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9e7df91d49f [improve][broker] Add broker filter sync method back to
guarantee the API compatibility (#20826)
9e7df91d49f is described below
commit 9e7df91d49fe9a097c64f52d79f436d7676aae17
Author: Kai Wang <[email protected]>
AuthorDate: Mon Jul 24 10:43:07 2023 +0800
[improve][broker] Add broker filter sync method back to guarantee the API
compatibility (#20826)
(cherry picked from commit 69d7a2bf14555f11a716a9545c5cf391d8179a27)
---
.../extensions/ExtensibleLoadManagerImpl.java | 2 +-
.../filter/AntiAffinityGroupPolicyFilter.java | 2 +-
.../extensions/filter/BrokerFilter.java | 22 +++++++++++++++++---
.../filter/BrokerIsolationPoliciesFilter.java | 6 +++---
.../filter/BrokerLoadManagerClassFilter.java | 2 +-
.../filter/BrokerMaxTopicCountFilter.java | 6 +++---
.../extensions/filter/BrokerVersionFilter.java | 6 +++---
.../extensions/scheduler/TransferShedder.java | 2 +-
.../AntiAffinityNamespaceGroupExtensionTest.java | 4 ++--
.../extensions/ExtensibleLoadManagerImplTest.java | 24 +++++++++++-----------
.../filter/BrokerIsolationPoliciesFilterTest.java | 20 +++++++++---------
.../filter/BrokerLoadManagerClassFilterTest.java | 4 ++--
.../filter/BrokerMaxTopicCountFilterTest.java | 2 +-
.../extensions/filter/BrokerVersionFilterTest.java | 12 +++++------
.../extensions/scheduler/TransferShedderTest.java | 6 +++---
15 files changed, 68 insertions(+), 52 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index f3c5e5dc2ee..cba499eb8ee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -493,7 +493,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
new ArrayList<>(filterPipeline.size());
for (final BrokerFilter filter : filterPipeline) {
CompletableFuture<Map<String, BrokerLookupData>>
future =
- filter.filter(availableBrokerCandidates,
bundle, context);
+ filter.filterAsync(availableBrokerCandidates,
bundle, context);
futures.add(future);
}
CompletableFuture<Optional<String>> result = new
CompletableFuture<>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
index df08ba96b9d..37b35d8661d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
@@ -39,7 +39,7 @@ public class AntiAffinityGroupPolicyFilter implements
BrokerFilter {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
Map<String, BrokerLookupData> brokers, ServiceUnitId
serviceUnitId, LoadManagerContext context) {
return helper.filterAsync(brokers, serviceUnitId.toString());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 046cae48914..2950a013389 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -20,6 +20,7 @@ package
org.apache.pulsar.broker.loadbalance.extensions.filter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -42,8 +43,23 @@ public interface BrokerFilter {
* @param context The load manager context.
* @return Filtered broker list.
*/
- CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String,
BrokerLookupData> brokers,
- ServiceUnitId
serviceUnit,
- LoadManagerContext
context);
+ @Deprecated
+ default Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context)
throws BrokerFilterException {
+ return filterAsync(brokers, serviceUnit, context).join();
+ }
+
+ /**
+ * Filter out async unqualified brokers based on implementation.
+ *
+ * @param brokers The full broker and lookup data.
+ * @param serviceUnit The current serviceUnit.
+ * @param context The load manager context.
+ * @return Filtered broker list.
+ */
+ CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
BrokerLookupData> brokers,
+ ServiceUnitId
serviceUnit,
+
LoadManagerContext context);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
index 0aa1dda437a..306c4c36f48 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
@@ -44,9 +44,9 @@ public class BrokerIsolationPoliciesFilter implements
BrokerFilter {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String,
BrokerLookupData> availableBrokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> availableBrokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
return
isolationPoliciesHelper.applyIsolationPoliciesAsync(availableBrokers,
serviceUnit)
.thenApply(brokerCandidateCache -> {
availableBrokers.keySet().retainAll(brokerCandidateCache);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
index 6504da047a0..54d2a555e61 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
@@ -34,7 +34,7 @@ public class BrokerLoadManagerClassFilter implements
BrokerFilter {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 044dcc83a78..472cabf1566 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -36,9 +36,9 @@ public class BrokerMaxTopicCountFilter implements
BrokerFilter {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String,
BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
int loadBalancerBrokerMaxTopics =
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt =
context.brokerLoadDataStore().get(broker);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
index 6e8ce5e58f2..1af39a6adb5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -47,9 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
*
*/
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String,
BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
ServiceConfiguration conf = context.brokerConfiguration();
if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
return CompletableFuture.completedFuture(brokers);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7bb16bac124..cd5c17febc2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -719,7 +719,7 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
Map<String, BrokerLookupData> candidates = new
HashMap<>(availableBrokers);
for (var filter : brokerFilterPipeline) {
try {
- filter.filter(candidates, namespaceBundle, context)
+ filter.filterAsync(candidates, namespaceBundle, context)
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException |
TimeoutException e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index 8a73a5fa500..2293ecd2c24 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -130,7 +130,7 @@ public class AntiAffinityNamespaceGroupExtensionTest
extends AntiAffinityNamespa
doReturn(namespace + "/" + bundle).when(namespaceBundle).toString();
var expected = new HashMap<>(brokers);
- var actual = antiAffinityGroupPolicyFilter.filter(
+ var actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
@@ -141,7 +141,7 @@ public class AntiAffinityNamespaceGroupExtensionTest
extends AntiAffinityNamespa
var srcBroker =
serviceUnitStateChannel.getOwnerAsync(namespaceBundle.toString())
.get(5, TimeUnit.SECONDS).get();
expected.remove(srcBroker);
- actual = antiAffinityGroupPolicyFilter.filter(
+ actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index cff92ab27c6..410c10eeac7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -267,9 +267,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>>
filter(Map<String, BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
brokers.remove(pulsar1.getLookupServiceAddress());
return CompletableFuture.completedFuture(brokers);
}
@@ -288,9 +288,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
doReturn(List.of(new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>>
filter(Map<String, BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
brokers.remove(brokers.keySet().iterator().next());
return FutureUtil.failedFuture(new
BrokerFilterException("Test"));
}
@@ -513,17 +513,17 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>>
filter(Map<String, BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
brokers.remove(lookupServiceAddress1);
return CompletableFuture.completedFuture(brokers);
}
},new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>>
filter(Map<String, BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
return FutureUtil.failedFuture(new
BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index b8d5d98fd4c..f45e1405e1d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -83,20 +83,20 @@ public class BrokerIsolationPoliciesFilterTest {
BrokerIsolationPoliciesFilter filter = new
BrokerIsolationPoliciesFilter(isolationPoliciesHelper);
// a. available-brokers: broker1, broker2, broker3 => result: broker1
- Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(Map.of(
+ Map<String, BrokerLookupData> result = filter.filterAsync(new
HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker1"));
// b. available-brokers: broker2, broker3 => result: broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker2"));
// c. available-brokers: broker3 => result: NULL
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertTrue(result.isEmpty());
@@ -104,20 +104,20 @@ public class BrokerIsolationPoliciesFilterTest {
setIsolationPolicies(policies, namespaceName, Set.of("broker1"),
Set.of("broker2"), Set.of("broker3"), 2);
// a. available-brokers: broker1, broker2, broker3 => result: broker1,
broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2"));
// b. available-brokers: broker2, broker3 => result: broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker2"));
// c. available-brokers: broker3 => result: NULL
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker3", getLookupData())), namespaceName,
getContext()).get();
assertTrue(result.isEmpty());
}
@@ -141,14 +141,14 @@ public class BrokerIsolationPoliciesFilterTest {
- Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(Map.of(
+ Map<String, BrokerLookupData> result = filter.filterAsync(new
HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(true, false),
"broker2", getLookupData(true, false),
"broker3", getLookupData())), namespaceBundle,
getContext()).get();
@@ -156,13 +156,13 @@ public class BrokerIsolationPoliciesFilterTest {
doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle,
getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(false, true),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle,
getContext()).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
index 6b35bf71a03..17475b41957 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
@@ -49,14 +49,14 @@ public class BrokerLoadManagerClassFilterTest extends
BrokerFilterTestBase {
"broker5", getLookupData("3.0.0", null)
);
- Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(originalBrokers), null, context).get();
+ Map<String, BrokerLookupData> result = filter.filterAsync(new
HashMap<>(originalBrokers), null, context).get();
assertEquals(result, Map.of(
"broker1", getLookupData("3.0.0",
ExtensibleLoadManagerImpl.class.getName()),
"broker2", getLookupData("3.0.0",
ExtensibleLoadManagerImpl.class.getName())
));
context.brokerConfiguration().setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
- result = filter.filter(new HashMap<>(originalBrokers), null,
context).get();
+ result = filter.filterAsync(new HashMap<>(originalBrokers), null,
context).get();
assertEquals(result, Map.of(
"broker3", getLookupData("3.0.0",
ModularLoadManagerImpl.class.getName()),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
index a6214304bd2..9d000cb91a1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
@@ -60,7 +60,7 @@ public class BrokerMaxTopicCountFilterTest extends
BrokerFilterTestBase {
"broker4", getLookupData()
);
Map<String, BrokerLookupData> result =
- filter.filter(new HashMap<>(originalBrokers), null,
context).get();
+ filter.filterAsync(new HashMap<>(originalBrokers), null,
context).get();
assertEquals(result, Map.of(
"broker2", getLookupData(),
"broker4", getLookupData()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
index 4a1a4543938..2aa7faeb599 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -43,7 +43,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
@Test
public void testFilterEmptyBrokerList() throws BrokerFilterException,
ExecutionException, InterruptedException {
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(), null, getContext()).get();
+ Map<String, BrokerLookupData> result =
brokerVersionFilter.filterAsync(new HashMap<>(), null, getContext()).get();
assertTrue(result.isEmpty());
}
@@ -60,7 +60,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
);
Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result =
brokerVersionFilter.filter(brokers, null, context).get();
+ Map<String, BrokerLookupData> result =
brokerVersionFilter.filterAsync(brokers, null, context).get();
assertEquals(result, originalBrokers);
}
@@ -73,7 +73,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6653", getLookupData("2.10.1")
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filterAsync(
new HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6651", getLookupData("2.10.1"),
@@ -87,7 +87,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6652", getLookupData("2.10.1"),
"localhost:6653", getLookupData("2.10.1")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
null, getContext()).get();
+ result = brokerVersionFilter.filterAsync(new
HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6652", getLookupData("2.10.1"),
@@ -101,7 +101,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
null, getContext()).get();
+ result = brokerVersionFilter.filterAsync(new
HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
));
@@ -115,7 +115,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
try {
- brokerVersionFilter.filter(new HashMap<>(originalBrokers), null,
getContext()).get();
+ brokerVersionFilter.filterAsync(new HashMap<>(originalBrokers),
null, getContext()).get();
fail();
} catch (Exception ex) {
assertEquals(ex.getCause().getClass(),
BrokerFilterBadVersionException.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 3dd41a92e68..26d95a0158d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -764,9 +764,9 @@ public class TransferShedderTest {
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>>
filter(Map<String, BrokerLookupData> brokers,
-
ServiceUnitId serviceUnit,
-
LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>>
filterAsync(Map<String, BrokerLookupData> brokers,
+
ServiceUnitId serviceUnit,
+
LoadManagerContext context) {
return FutureUtil.failedFuture(new
BrokerFilterException("test"));
}
};