This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 879cc01697e80cab65feeaccef7189e48c45751f Author: Yike Xiao <[email protected]> AuthorDate: Wed May 27 23:18:58 2026 +0800 [fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy when policy class does not match (#25825) Co-authored-by: Claude Sonnet 4.6 <[email protected]> (cherry picked from commit b93fe9e78f3929b52c33475aa2962a0450a7e2de) --- .../IsolatedBookieEnsemblePlacementPolicy.java | 28 +++-- .../IsolatedBookieEnsemblePlacementPolicyTest.java | 136 +++++++++++++++++++++ 2 files changed, 152 insertions(+), 12 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 4ef1c594be4..f2c1b5e7361 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.bookie.rackawareness; +import static java.util.Collections.emptySet; import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; @@ -164,11 +165,13 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac return Optional.empty(); } - private static Pair<Set<String>, Set<String>> getIsolationGroup( + @VisibleForTesting + Pair<Set<String>, Set<String>> getIsolationGroup( EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) { - MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(); - String className = IsolatedBookieEnsemblePlacementPolicy.class.getName(); - if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) { + // Retain compatibility with ZkIsolatedBookieEnsemblePlacementPolicy + Class<?> policyClass = ensemblePlacementPolicyConfig.getPolicyClass(); + if (IsolatedBookieEnsemblePlacementPolicy.class.isAssignableFrom(policyClass)) { + MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(emptySet(), emptySet()); Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties(); String primaryIsolationGroupString = ConfigurationStringUtil .castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, "")); @@ -176,21 +179,22 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac .castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, "")); if (!primaryIsolationGroupString.isEmpty()) { pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(","))); - } else { - pair.setLeft(Collections.emptySet()); } if (!secondaryIsolationGroupString.isEmpty()) { pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(","))); - } else { - pair.setRight(Collections.emptySet()); } + return pair; + } else { + log.info("The ensemble placement policy class [{}] is not compatible with " + + "IsolatedBookieEnsemblePlacementPolicy, fallback to use defaultIsolationGroups", + ensemblePlacementPolicyConfig.getPolicyClass().getName()); + return defaultIsolationGroups; } - return pair; } @VisibleForTesting Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize, - Pair<Set<String>, Set<String>> isolationGroups) { + Pair<Set<String>, Set<String>> isolationGroups) { Set<BookieId> excludedBookies = new HashSet<>(); if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) { return excludedBookies; @@ -213,8 +217,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac return excludedBookies; } int totalAvailableBookiesInPrimaryGroup = 0; - Set<String> primaryIsolationGroup = Collections.emptySet(); - Set<String> secondaryIsolationGroup = Collections.emptySet(); + Set<String> primaryIsolationGroup = emptySet(); + Set<String> secondaryIsolationGroup = emptySet(); Set<BookieId> primaryGroupBookies = new HashSet<>(); if (isolationGroups != null) { primaryIsolationGroup = isolationGroups.getLeft(); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 0dc996c7d7d..fc67395c813 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -42,12 +42,14 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; +import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.SettableFeatureProvider; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; @@ -57,6 +59,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; +import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -844,6 +847,139 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { assertTrue(blacklist.isEmpty()); } + /** + * Regression test for the NPE reported in the stack trace below. When custom metadata carries an + * {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match + * {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code getIsolationGroup()} returned a + * {@code MutablePair} with {@code null} left/right, which caused a {@link NullPointerException} in + * {@code getExcludedBookiesWithIsolationGroups} when {@code getLeft().contains(...)} was called. + * + * <pre> + * java.lang.NullPointerException: Cannot invoke "java.util.Set.contains(Object)" + * because the return value of "org.apache.commons.lang3.tuple.Pair.getLeft()" is null + * at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...) + * at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...) + * at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...) + * </pre> + */ + @Test + public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE() throws Exception { + Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>(); + Map<String, BookieInfo> group1 = new HashMap<>(); + group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); + group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); + group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build()); + group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build()); + bookieMapping.put("group1", group1); + + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + Optional.empty()).join(); + + IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); + bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "group1"); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + // Use a policy class that does NOT match IsolatedBookieEnsemblePlacementPolicy. + // In the old code this caused getIsolationGroup() to return a MutablePair with null left/right, + // triggering NPE at the getLeft().contains() call in getExcludedBookiesWithIsolationGroups. + EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig( + RackawareEnsemblePlacementPolicy.class, Collections.emptyMap()); + Map<String, byte[]> customMetadata = new HashMap<>(); + customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode()); + + BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId(); + BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId(); + + // Must not throw NullPointerException; BKNotEnoughBookiesException is acceptable. + isolationPolicy.replaceBookie(2, 2, 2, customMetadata, + Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null); + } + + /** + * Verifies that {@link IsolatedBookieEnsemblePlacementPolicy#getIsolationGroup} treats + * {@link ZkIsolatedBookieEnsemblePlacementPolicy} (a subclass) exactly like + * {@link IsolatedBookieEnsemblePlacementPolicy} itself when reading isolation groups from + * {@link EnsemblePlacementPolicyConfig} properties. + * + * <p>Legacy Pulsar clusters may have persisted {@code EnsemblePlacementPolicyConfig} entries whose + * {@code policyClass} field is set to {@code ZkIsolatedBookieEnsemblePlacementPolicy}. The + * {@code isAssignableFrom} check in {@code getIsolationGroup} must recognise this subclass so that + * the isolation groups are read from the stored properties rather than falling back to the + * policy-level defaults. + */ + @Test + public void testGetIsolationGroupWithZkCompatiblePolicyClass() throws Exception { + // Group1 → default isolation group configured on the policy. + // Group2 → isolation group carried inside the custom metadata (ZkIsolated class). + final String defaultGroup = "Group1"; + final String customGroup = "Group2"; + + Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>(); + Map<String, BookieInfo> group1 = new HashMap<>(); + group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); + group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build()); + Map<String, BookieInfo> group2 = new HashMap<>(); + group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build()); + group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build()); + bookieMapping.put(defaultGroup, group1); + bookieMapping.put(customGroup, group2); + + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + Optional.empty()).join(); + + IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); + bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, defaultGroup); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + // --- unit-level: getIsolationGroup should parse properties, not fall back to defaults --- + Map<String, Object> props = new HashMap<>(); + props.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup); + props.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "secondaryGroup"); + EnsemblePlacementPolicyConfig zkConfig = new EnsemblePlacementPolicyConfig( + ZkIsolatedBookieEnsemblePlacementPolicy.class, props); + + Pair<Set<String>, Set<String>> groups = isolationPolicy.getIsolationGroup(zkConfig); + assertEquals(groups.getLeft(), Sets.newHashSet(customGroup), + "primary group must be read from ZkIsolated config properties"); + assertEquals(groups.getRight(), Sets.newHashSet("secondaryGroup"), + "secondary group must be read from ZkIsolated config properties"); + + // --- integration-level: newEnsemble must select bookies from the ZkIsolated config group --- + Map<String, Object> placementPolicyProperties = new HashMap<>(); + placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup); + placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, ""); + EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig( + ZkIsolatedBookieEnsemblePlacementPolicy.class, placementPolicyProperties); + Map<String, byte[]> customMetadata = new HashMap<>(); + customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode()); + + Set<BookieId> bookieIdGroup2 = new HashSet<>(); + bookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId()); + bookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId()); + + List<BookieId> ensemble = isolationPolicy + .newEnsemble(2, 2, 2, customMetadata, new HashSet<>()).getResult(); + assertTrue(bookieIdGroup2.containsAll(ensemble), + "ensemble should come from " + customGroup + " (ZkIsolated config), got " + ensemble); + + // Sanity-check: without custom metadata the default group1 bookies are chosen. + Set<BookieId> bookieIdGroup1 = new HashSet<>(); + bookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId()); + bookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId()); + List<BookieId> defaultEnsemble = isolationPolicy + .newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); + assertTrue(bookieIdGroup1.containsAll(defaultEnsemble), + "default ensemble should come from " + defaultGroup + ", got " + defaultEnsemble); + } + // The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into // the metadata store, the cache needs some time to receive the notification and update accordingly. private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {
