This is an automated email from the ASF dual-hosted git repository.

dao-jun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b93fe9e78f3 [fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy 
when policy class does not match (#25825)
b93fe9e78f3 is described below

commit b93fe9e78f3929b52c33475aa2962a0450a7e2de
Author: Yike Xiao <[email protected]>
AuthorDate: Wed May 27 23:18:58 2026 +0800

    [fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy when policy 
class does not match (#25825)
    
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 .../IsolatedBookieEnsemblePlacementPolicy.java     |  29 +++--
 .../IsolatedBookieEnsemblePlacementPolicyTest.java | 136 +++++++++++++++++++++
 2 files changed, 153 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index fc850516462..80dee20c1ba 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static java.util.Collections.emptySet;
 import static 
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
@@ -166,11 +167,13 @@ public class IsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePlac
         return Optional.empty();
     }
 
-    private static Pair<Set<String>, Set<String>> getIsolationGroup(
+    @VisibleForTesting
+    Pair<Set<String>, Set<String>> getIsolationGroup(
             EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
-        MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
-        String className = 
IsolatedBookieEnsemblePlacementPolicy.class.getName();
-        if 
(ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
+        // Retain compatibility with ZkIsolatedBookieEnsemblePlacementPolicy
+        Class<?> policyClass = ensemblePlacementPolicyConfig.getPolicyClass();
+        if 
(IsolatedBookieEnsemblePlacementPolicy.class.isAssignableFrom(policyClass)) {
+            MutablePair<Set<String>, Set<String>> pair = new 
MutablePair<>(emptySet(), emptySet());
             Map<String, Object> properties = 
ensemblePlacementPolicyConfig.getProperties();
             String primaryIsolationGroupString = ConfigurationStringUtil
                     
.castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
@@ -178,21 +181,23 @@ public class IsolatedBookieEnsemblePlacementPolicy 
extends RackawareEnsemblePlac
                     
.castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
             if (!primaryIsolationGroupString.isEmpty()) {
                 
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
-            } else {
-                pair.setLeft(Collections.emptySet());
             }
             if (!secondaryIsolationGroupString.isEmpty()) {
                 
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
-            } else {
-                pair.setRight(Collections.emptySet());
             }
+            return pair;
+        } else {
+            log.info()
+                    .attr("policyClass", 
ensemblePlacementPolicyConfig.getPolicyClass().getName())
+                    .log("The ensemble placement policy class is not 
compatible with "
+                            + "IsolatedBookieEnsemblePlacementPolicy, fallback 
to use defaultIsolationGroups");
+            return defaultIsolationGroups;
         }
-        return pair;
     }
 
     @VisibleForTesting
     Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
-        Pair<Set<String>, Set<String>> isolationGroups) {
+                                                        Pair<Set<String>, 
Set<String>> isolationGroups) {
         Set<BookieId> excludedBookies = new HashSet<>();
         if (isolationGroups != null && 
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
             return excludedBookies;
@@ -215,8 +220,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
                     return excludedBookies;
                 }
                 int totalAvailableBookiesInPrimaryGroup = 0;
-                Set<String> primaryIsolationGroup = Collections.emptySet();
-                Set<String> secondaryIsolationGroup = Collections.emptySet();
+                Set<String> primaryIsolationGroup = emptySet();
+                Set<String> secondaryIsolationGroup = emptySet();
                 Set<BookieId> primaryGroupBookies = new HashSet<>();
                 if (isolationGroups != null) {
                     primaryIsolationGroup = isolationGroups.getLeft();
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 7940ccc636b..cd5041de459 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -43,12 +43,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -58,6 +60,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -839,6 +842,139 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         assertTrue(blacklist.isEmpty());
     }
 
+    /**
+     * Regression test for the NPE reported in the stack trace below. When 
custom metadata carries an
+     * {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match
+     * {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code 
getIsolationGroup()} returned a
+     * {@code MutablePair} with {@code null} left/right, which caused a {@link 
NullPointerException} in
+     * {@code getExcludedBookiesWithIsolationGroups} when {@code 
getLeft().contains(...)} was called.
+     *
+     * <pre>
+     * java.lang.NullPointerException: Cannot invoke 
"java.util.Set.contains(Object)"
+     *     because the return value of 
"org.apache.commons.lang3.tuple.Pair.getLeft()" is null
+     *     at 
IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...)
+     *     at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...)
+     *     at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...)
+     * </pre>
+     */
+    @Test
+    public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE() 
throws Exception {
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> group1 = new HashMap<>();
+        group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+        group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
+        group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
+        group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+        bookieMapping.put("group1", group1);
+
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
+
+        IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new 
IsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+        
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 "group1");
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        // Use a policy class that does NOT match 
IsolatedBookieEnsemblePlacementPolicy.
+        // In the old code this caused getIsolationGroup() to return a 
MutablePair with null left/right,
+        // triggering NPE at the getLeft().contains() call in 
getExcludedBookiesWithIsolationGroups.
+        EnsemblePlacementPolicyConfig policyConfig = new 
EnsemblePlacementPolicyConfig(
+                RackawareEnsemblePlacementPolicy.class, 
Collections.emptyMap());
+        Map<String, byte[]> customMetadata = new HashMap<>();
+        
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
 policyConfig.encode());
+
+        BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
+        BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
+
+        // Must not throw NullPointerException; BKNotEnoughBookiesException is 
acceptable.
+        isolationPolicy.replaceBookie(2, 2, 2, customMetadata,
+                Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null);
+    }
+
+    /**
+     * Verifies that {@link 
IsolatedBookieEnsemblePlacementPolicy#getIsolationGroup} treats
+     * {@link ZkIsolatedBookieEnsemblePlacementPolicy} (a subclass) exactly 
like
+     * {@link IsolatedBookieEnsemblePlacementPolicy} itself when reading 
isolation groups from
+     * {@link EnsemblePlacementPolicyConfig} properties.
+     *
+     * <p>Legacy Pulsar clusters may have persisted {@code 
EnsemblePlacementPolicyConfig} entries whose
+     * {@code policyClass} field is set to {@code 
ZkIsolatedBookieEnsemblePlacementPolicy}.  The
+     * {@code isAssignableFrom} check in {@code getIsolationGroup} must 
recognise this subclass so that
+     * the isolation groups are read from the stored properties rather than 
falling back to the
+     * policy-level defaults.
+     */
+    @Test
+    public void testGetIsolationGroupWithZkCompatiblePolicyClass() throws 
Exception {
+        // Group1 → default isolation group configured on the policy.
+        // Group2 → isolation group carried inside the custom metadata 
(ZkIsolated class).
+        final String defaultGroup = "Group1";
+        final String customGroup = "Group2";
+
+        Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+        Map<String, BookieInfo> group1 = new HashMap<>();
+        group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+        group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
+        Map<String, BookieInfo> group2 = new HashMap<>();
+        group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
+        group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+        bookieMapping.put(defaultGroup, group1);
+        bookieMapping.put(customGroup, group2);
+
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
+
+        IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new 
IsolatedBookieEnsemblePlacementPolicy();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, 
store);
+        
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 defaultGroup);
+        isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, 
SettableFeatureProvider.DISABLE_ALL,
+                NullStatsLogger.INSTANCE, 
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+        // --- unit-level: getIsolationGroup should parse properties, not fall 
back to defaults ---
+        Map<String, Object> props = new HashMap<>();
+        
props.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, 
customGroup);
+        
props.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "secondaryGroup");
+        EnsemblePlacementPolicyConfig zkConfig = new 
EnsemblePlacementPolicyConfig(
+                ZkIsolatedBookieEnsemblePlacementPolicy.class, props);
+
+        Pair<Set<String>, Set<String>> groups = 
isolationPolicy.getIsolationGroup(zkConfig);
+        assertEquals(groups.getLeft(), Sets.newHashSet(customGroup),
+                "primary group must be read from ZkIsolated config 
properties");
+        assertEquals(groups.getRight(), Sets.newHashSet("secondaryGroup"),
+                "secondary group must be read from ZkIsolated config 
properties");
+
+        // --- integration-level: newEnsemble must select bookies from the 
ZkIsolated config group ---
+        Map<String, Object> placementPolicyProperties = new HashMap<>();
+        
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
 customGroup);
+        
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
 "");
+        EnsemblePlacementPolicyConfig policyConfig = new 
EnsemblePlacementPolicyConfig(
+                ZkIsolatedBookieEnsemblePlacementPolicy.class, 
placementPolicyProperties);
+        Map<String, byte[]> customMetadata = new HashMap<>();
+        
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
 policyConfig.encode());
+
+        Set<BookieId> bookieIdGroup2 = new HashSet<>();
+        bookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+        bookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());
+
+        List<BookieId> ensemble = isolationPolicy
+                .newEnsemble(2, 2, 2, customMetadata, new 
HashSet<>()).getResult();
+        assertTrue(bookieIdGroup2.containsAll(ensemble),
+                "ensemble should come from " + customGroup + " (ZkIsolated 
config), got " + ensemble);
+
+        // Sanity-check: without custom metadata the default group1 bookies 
are chosen.
+        Set<BookieId> bookieIdGroup1 = new HashSet<>();
+        bookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+        bookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+        List<BookieId> defaultEnsemble = isolationPolicy
+                .newEnsemble(2, 2, 2, Collections.emptyMap(), new 
HashSet<>()).getResult();
+        assertTrue(bookieIdGroup1.containsAll(defaultEnsemble),
+                "default ensemble should come from " + defaultGroup + ", got " 
+ defaultEnsemble);
+    }
+
     // The policy gets the bookie info asynchronously before each query or 
update, when putting the bookie info into
     // the metadata store, the cache needs some time to receive the 
notification and update accordingly.
     private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy 
isolationPolicy, byte[] bookieInfo) {

Reply via email to