This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7449baa6e53 [improve][broker] PIP-192: Implement broker version filter
for new load manager (#19023)
7449baa6e53 is described below
commit 7449baa6e53de5c5c1135c6d2e953bf4dcdfd3da
Author: Kai Wang <[email protected]>
AuthorDate: Sat Feb 4 17:30:30 2023 +0800
[improve][broker] PIP-192: Implement broker version filter for new load
manager (#19023)
---
.../extensions/ExtensibleLoadManagerImpl.java | 2 +
.../extensions/filter/BrokerFilter.java | 2 +-
.../extensions/filter/BrokerVersionFilter.java | 147 +++++++++++++++++++++
.../extensions/filter/BrokerVersionFilterTest.java | 140 ++++++++++++++++++++
4 files changed, 290 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index d95bacd157e..66c271ab22e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -38,6 +38,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -95,6 +96,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
+ this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 0a76446d3ce..35f4b6817f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -36,7 +36,7 @@ public interface BrokerFilter {
/**
* Filter out unqualified brokers based on implementation.
*
- * @param brokers The full brokers.
+ * @param brokers The full broker and lookup data.
* @param context The load manager context.
* @return Filtered broker list.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
new file mode 100644
index 00000000000..869fb049a3c
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import com.github.zafarkhaja.semver.Version;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+
+/**
+ * Filter by broker version.
+ */
+@Slf4j
+public class BrokerVersionFilter implements BrokerFilter {
+
+ public static final String FILTER_NAME = "broker_version_filter";
+
+
+ /**
+ * From the given set of available broker candidates, filter those old
brokers using the version numbers.
+ *
+ * @param brokers The currently available brokers that have not already
been filtered.
+ * @param context The load manager context.
+ *
+ */
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers, LoadManagerContext context)
+ throws BrokerFilterException {
+ ServiceConfiguration conf = context.brokerConfiguration();
+ if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
+ return brokers;
+ }
+
+ Version latestVersion;
+ try {
+ latestVersion = getLatestVersionNumber(brokers);
+ if (log.isDebugEnabled()) {
+ log.debug("Latest broker version found was [{}]",
latestVersion);
+ }
+ } catch (Exception ex) {
+ log.warn("Disabling PreferLaterVersions feature; reason: " +
ex.getMessage());
+ throw new BrokerFilterBadVersionException("Cannot determine newest
broker version: " + ex.getMessage());
+ }
+
+ int numBrokersLatestVersion = 0;
+ int numBrokersOlderVersion = 0;
+
+ Iterator<Map.Entry<String, BrokerLookupData>> brokerIterator =
brokers.entrySet().iterator();
+ while (brokerIterator.hasNext()) {
+ Map.Entry<String, BrokerLookupData> next = brokerIterator.next();
+ String brokerId = next.getKey();
+ String version = next.getValue().brokerVersion();
+ Version brokerVersionVersion = Version.valueOf(version);
+ if (brokerVersionVersion.equals(latestVersion)) {
+ log.debug("Broker [{}] is running the latest version ([{}])",
brokerId, version);
+ numBrokersLatestVersion++;
+ } else {
+ log.info("Broker [{}] is running an older version ([{}]);
latest version is [{}]",
+ brokerId, version, latestVersion);
+ numBrokersOlderVersion++;
+ brokerIterator.remove();
+ }
+ }
+ if (numBrokersOlderVersion == 0) {
+ log.info("All {} brokers are running the latest version [{}]",
numBrokersLatestVersion, latestVersion);
+ }
+ return brokers;
+ }
+
+ /**
+ * Get the most recent broker version number from the broker lookup data
of all the running brokers.
+ * The version number is from the build artifact in the pom and got added
to the package when it was built by Maven
+ *
+ * @param brokerMap
+ * The BrokerId -> BrokerLookupData Map.
+ * @return The most recent broker version
+ * @throws BrokerFilterBadVersionException
+ * If the most recent version is undefined (e.g., a bad broker
version was encountered or a broker
+ * does not have a version string in its lookup data.
+ */
+ public Version getLatestVersionNumber(Map<String, BrokerLookupData>
brokerMap)
+ throws BrokerFilterBadVersionException {
+
+ if (brokerMap.size() == 0) {
+ throw new BrokerFilterBadVersionException(
+ "Unable to determine latest version since broker version
map was empty");
+ }
+
+ Version latestVersion = null;
+ for (Map.Entry<String, BrokerLookupData> entry : brokerMap.entrySet())
{
+ String brokerId = entry.getKey();
+ String version = entry.getValue().brokerVersion();
+ if (null == version || version.length() == 0) {
+ log.warn("No version string in lookup data for broker [{}];
disabling PreferLaterVersions feature",
+ brokerId);
+ // Trigger the load manager to reset all the brokers to the
original set
+ throw new BrokerFilterBadVersionException("No version string
in lookup data for broker \""
+ + brokerId + "\"");
+ }
+ Version brokerVersionVersion;
+ try {
+ brokerVersionVersion = Version.valueOf(version);
+ } catch (Exception x) {
+ log.warn("Invalid version string in lookup data for broker
[{}]: [{}];"
+ + " disabling PreferLaterVersions feature",
+ brokerId, version);
+ // Trigger the load manager to reset all the brokers to the
original set
+ throw new BrokerFilterBadVersionException("Invalid version
string in lookup data for broker \""
+ + brokerId + "\": \"" + version + "\")");
+ }
+
+ if (latestVersion == null) {
+ latestVersion = brokerVersionVersion;
+ } else if (Version.BUILD_AWARE_ORDER.compare(latestVersion,
brokerVersionVersion) < 0) {
+ latestVersion = brokerVersionVersion;
+ }
+ }
+
+ return latestVersion;
+ }
+
+ @Override
+ public String name() {
+ return FILTER_NAME;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
new file mode 100644
index 00000000000..1fcc3836a6f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link BrokerVersionFilter}.
+ */
+@Test(groups = "broker")
+public class BrokerVersionFilterTest {
+
+
+ @Test
+ public void testFilterEmptyBrokerList() throws BrokerFilterException {
+ BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(), getContext());
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testDisabledFilter() throws BrokerFilterException {
+ LoadManagerContext context = getContext();
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setPreferLaterVersions(false);
+ doReturn(configuration).when(context).brokerConfiguration();
+
+ Map<String, BrokerLookupData> originalBrokers = Map.of(
+ "localhost:6650", getLookupData("2.10.0"),
+ "localhost:6651", getLookupData("2.10.1")
+ );
+ Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
+ BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+ Map<String, BrokerLookupData> result =
brokerVersionFilter.filter(brokers, context);
+ assertEquals(result, originalBrokers);
+ }
+
+ @Test
+ public void testFilter() throws BrokerFilterException {
+ Map<String, BrokerLookupData> originalBrokers = Map.of(
+ "localhost:6650", getLookupData("2.10.0"),
+ "localhost:6651", getLookupData("2.10.1"),
+ "localhost:6652", getLookupData("2.10.1"),
+ "localhost:6653", getLookupData("2.10.1")
+ );
+ BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(originalBrokers), getContext());
+ assertEquals(result, Map.of(
+ "localhost:6651", getLookupData("2.10.1"),
+ "localhost:6652", getLookupData("2.10.1"),
+ "localhost:6653", getLookupData("2.10.1")
+ ));
+
+ originalBrokers = Map.of(
+ "localhost:6650", getLookupData("2.10.0"),
+ "localhost:6651", getLookupData("2.10.1-SNAPSHOT"),
+ "localhost:6652", getLookupData("2.10.1"),
+ "localhost:6653", getLookupData("2.10.1")
+ );
+ result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+
+ assertEquals(result, Map.of(
+ "localhost:6652", getLookupData("2.10.1"),
+ "localhost:6653", getLookupData("2.10.1")
+ ));
+
+ originalBrokers = Map.of(
+ "localhost:6650", getLookupData("2.10.0"),
+ "localhost:6651", getLookupData("2.10.1-SNAPSHOT"),
+ "localhost:6652", getLookupData("2.10.1"),
+ "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
+ );
+
+ result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+ assertEquals(result, Map.of(
+ "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
+ ));
+
+ }
+
+ @Test(expectedExceptions = BrokerFilterBadVersionException.class)
+ public void testInvalidVersionString() throws BrokerFilterException {
+ Map<String, BrokerLookupData> originalBrokers = Map.of(
+ "localhost:6650", getLookupData("xxx")
+ );
+ BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+ brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+ }
+
+ public LoadManagerContext getContext() {
+ LoadManagerContext mockContext = mock(LoadManagerContext.class);
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setPreferLaterVersions(true);
+ doReturn(configuration).when(mockContext).brokerConfiguration();
+ return mockContext;
+ }
+
+ public BrokerLookupData getLookupData(String version) {
+ String webServiceUrl = "http://localhost:8080";
+ String webServiceUrlTls = "https://localhoss:8081";
+ String pulsarServiceUrl = "pulsar://localhost:6650";
+ String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+ Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+ Map<String, String> protocols = new HashMap<>(){{
+ put("kafka", "9092");
+ }};
+ return new BrokerLookupData(
+ webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+ pulsarServiceUrlTls, advertisedListeners, protocols, true,
true, version);
+ }
+}