This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7449baa6e53 [improve][broker] PIP-192: Implement broker version filter 
for new load manager (#19023)
7449baa6e53 is described below

commit 7449baa6e53de5c5c1135c6d2e953bf4dcdfd3da
Author: Kai Wang <[email protected]>
AuthorDate: Sat Feb 4 17:30:30 2023 +0800

    [improve][broker] PIP-192: Implement broker version filter for new load 
manager (#19023)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |   2 +
 .../extensions/filter/BrokerFilter.java            |   2 +-
 .../extensions/filter/BrokerVersionFilter.java     | 147 +++++++++++++++++++++
 .../extensions/filter/BrokerVersionFilterTest.java | 140 ++++++++++++++++++++
 4 files changed, 290 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index d95bacd157e..66c271ab22e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -95,6 +96,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
      */
     public ExtensibleLoadManagerImpl() {
         this.brokerFilterPipeline = new ArrayList<>();
+        this.brokerFilterPipeline.add(new BrokerVersionFilter());
         // TODO: Make brokerSelectionStrategy configurable.
         this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 0a76446d3ce..35f4b6817f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -36,7 +36,7 @@ public interface BrokerFilter {
     /**
      * Filter out unqualified brokers based on implementation.
      *
-     * @param brokers The full brokers.
+     * @param brokers The full broker and lookup data.
      * @param context The load manager context.
      * @return Filtered broker list.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
new file mode 100644
index 00000000000..869fb049a3c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import com.github.zafarkhaja.semver.Version;
+import java.util.Iterator;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+
+/**
+ * Filter by broker version.
+ */
+@Slf4j
+public class BrokerVersionFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = "broker_version_filter";
+
+
+    /**
+     * From the given set of available broker candidates, filter those old 
brokers using the version numbers.
+     *
+     * @param brokers The currently available brokers that have not already 
been filtered.
+     * @param context The load manager context.
+     *
+     */
+    @Override
+    public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers, LoadManagerContext context)
+            throws BrokerFilterException {
+        ServiceConfiguration conf = context.brokerConfiguration();
+        if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
+            return brokers;
+        }
+
+        Version latestVersion;
+        try {
+            latestVersion = getLatestVersionNumber(brokers);
+            if (log.isDebugEnabled()) {
+                log.debug("Latest broker version found was [{}]", 
latestVersion);
+            }
+        } catch (Exception ex) {
+            log.warn("Disabling PreferLaterVersions feature; reason: " + 
ex.getMessage());
+            throw new BrokerFilterBadVersionException("Cannot determine newest 
broker version: " + ex.getMessage());
+        }
+
+        int numBrokersLatestVersion = 0;
+        int numBrokersOlderVersion = 0;
+
+        Iterator<Map.Entry<String, BrokerLookupData>> brokerIterator = 
brokers.entrySet().iterator();
+        while (brokerIterator.hasNext()) {
+            Map.Entry<String, BrokerLookupData> next = brokerIterator.next();
+            String brokerId = next.getKey();
+            String version = next.getValue().brokerVersion();
+            Version brokerVersionVersion = Version.valueOf(version);
+            if (brokerVersionVersion.equals(latestVersion)) {
+                log.debug("Broker [{}] is running the latest version ([{}])", 
brokerId, version);
+                numBrokersLatestVersion++;
+            } else {
+                log.info("Broker [{}] is running an older version ([{}]); 
latest version is [{}]",
+                        brokerId, version, latestVersion);
+                numBrokersOlderVersion++;
+                brokerIterator.remove();
+            }
+        }
+        if (numBrokersOlderVersion == 0) {
+            log.info("All {} brokers are running the latest version [{}]", 
numBrokersLatestVersion, latestVersion);
+        }
+        return brokers;
+    }
+
+    /**
+     * Get the most recent broker version number from the broker lookup data 
of all the running brokers.
+     * The version number is from the build artifact in the pom and got added 
to the package when it was built by Maven
+     *
+     * @param brokerMap
+     *             The BrokerId -> BrokerLookupData Map.
+     * @return The most recent broker version
+     * @throws BrokerFilterBadVersionException
+     *            If the most recent version is undefined (e.g., a bad broker 
version was encountered or a broker
+     *            does not have a version string in its lookup data.
+     */
+    public Version getLatestVersionNumber(Map<String, BrokerLookupData> 
brokerMap)
+            throws BrokerFilterBadVersionException {
+
+        if (brokerMap.size() == 0) {
+            throw new BrokerFilterBadVersionException(
+                    "Unable to determine latest version since broker version 
map was empty");
+        }
+
+        Version latestVersion = null;
+        for (Map.Entry<String, BrokerLookupData> entry : brokerMap.entrySet()) 
{
+            String brokerId = entry.getKey();
+            String version = entry.getValue().brokerVersion();
+            if (null == version || version.length() == 0) {
+                log.warn("No version string in lookup data for broker [{}]; 
disabling PreferLaterVersions feature",
+                        brokerId);
+                // Trigger the load manager to reset all the brokers to the 
original set
+                throw new BrokerFilterBadVersionException("No version string 
in lookup data for broker \""
+                        + brokerId + "\"");
+            }
+            Version brokerVersionVersion;
+            try {
+                brokerVersionVersion = Version.valueOf(version);
+            } catch (Exception x) {
+                log.warn("Invalid version string in lookup data for broker 
[{}]: [{}];"
+                                + " disabling PreferLaterVersions feature",
+                        brokerId, version);
+                // Trigger the load manager to reset all the brokers to the 
original set
+                throw new BrokerFilterBadVersionException("Invalid version 
string in lookup data for broker \""
+                        + brokerId + "\": \"" + version + "\")");
+            }
+
+            if (latestVersion == null) {
+                latestVersion = brokerVersionVersion;
+            } else if (Version.BUILD_AWARE_ORDER.compare(latestVersion, 
brokerVersionVersion) < 0) {
+                latestVersion = brokerVersionVersion;
+            }
+        }
+
+        return latestVersion;
+    }
+
+    @Override
+    public String name() {
+        return FILTER_NAME;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
new file mode 100644
index 00000000000..1fcc3836a6f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link BrokerVersionFilter}.
+ */
+@Test(groups = "broker")
+public class BrokerVersionFilterTest {
+
+
+    @Test
+    public void testFilterEmptyBrokerList() throws BrokerFilterException {
+        BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(), getContext());
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testDisabledFilter() throws BrokerFilterException {
+        LoadManagerContext context = getContext();
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setPreferLaterVersions(false);
+        doReturn(configuration).when(context).brokerConfiguration();
+
+        Map<String, BrokerLookupData> originalBrokers = Map.of(
+                "localhost:6650", getLookupData("2.10.0"),
+                "localhost:6651", getLookupData("2.10.1")
+        );
+        Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
+        BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filter(brokers, context);
+        assertEquals(result, originalBrokers);
+    }
+
+    @Test
+    public void testFilter() throws BrokerFilterException {
+        Map<String, BrokerLookupData> originalBrokers = Map.of(
+                "localhost:6650", getLookupData("2.10.0"),
+                "localhost:6651", getLookupData("2.10.1"),
+                "localhost:6652", getLookupData("2.10.1"),
+                "localhost:6653", getLookupData("2.10.1")
+        );
+        BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(originalBrokers), getContext());
+        assertEquals(result, Map.of(
+                "localhost:6651", getLookupData("2.10.1"),
+                "localhost:6652", getLookupData("2.10.1"),
+                "localhost:6653", getLookupData("2.10.1")
+        ));
+
+        originalBrokers = Map.of(
+                "localhost:6650", getLookupData("2.10.0"),
+                "localhost:6651", getLookupData("2.10.1-SNAPSHOT"),
+                "localhost:6652", getLookupData("2.10.1"),
+                "localhost:6653", getLookupData("2.10.1")
+        );
+        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+
+        assertEquals(result, Map.of(
+                "localhost:6652", getLookupData("2.10.1"),
+                "localhost:6653", getLookupData("2.10.1")
+        ));
+
+        originalBrokers = Map.of(
+                "localhost:6650", getLookupData("2.10.0"),
+                "localhost:6651", getLookupData("2.10.1-SNAPSHOT"),
+                "localhost:6652", getLookupData("2.10.1"),
+                "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
+        );
+
+        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+        assertEquals(result, Map.of(
+                "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
+        ));
+
+    }
+
+    @Test(expectedExceptions = BrokerFilterBadVersionException.class)
+    public void testInvalidVersionString() throws BrokerFilterException {
+        Map<String, BrokerLookupData> originalBrokers = Map.of(
+                "localhost:6650", getLookupData("xxx")
+        );
+        BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
+        brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+    }
+
+    public LoadManagerContext getContext() {
+        LoadManagerContext mockContext = mock(LoadManagerContext.class);
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setPreferLaterVersions(true);
+        doReturn(configuration).when(mockContext).brokerConfiguration();
+        return mockContext;
+    }
+
+    public BrokerLookupData getLookupData(String version) {
+        String webServiceUrl = "http://localhost:8080";;
+        String webServiceUrlTls = "https://localhoss:8081";;
+        String pulsarServiceUrl = "pulsar://localhost:6650";
+        String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        Map<String, String> protocols = new HashMap<>(){{
+            put("kafka", "9092");
+        }};
+        return new BrokerLookupData(
+                webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+                pulsarServiceUrlTls, advertisedListeners, protocols, true, 
true, version);
+    }
+}

Reply via email to