This is an automated email from the ASF dual-hosted git repository.
dao-jun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b93fe9e78f3 [fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy
when policy class does not match (#25825)
b93fe9e78f3 is described below
commit b93fe9e78f3929b52c33475aa2962a0450a7e2de
Author: Yike Xiao <[email protected]>
AuthorDate: Wed May 27 23:18:58 2026 +0800
[fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy when policy
class does not match (#25825)
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../IsolatedBookieEnsemblePlacementPolicy.java | 29 +++--
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 136 +++++++++++++++++++++
2 files changed, 153 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index fc850516462..80dee20c1ba 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static java.util.Collections.emptySet;
import static
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
@@ -166,11 +167,13 @@ public class IsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePlac
return Optional.empty();
}
- private static Pair<Set<String>, Set<String>> getIsolationGroup(
+ @VisibleForTesting
+ Pair<Set<String>, Set<String>> getIsolationGroup(
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
- MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
- String className =
IsolatedBookieEnsemblePlacementPolicy.class.getName();
- if
(ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
+ // Retain compatibility with ZkIsolatedBookieEnsemblePlacementPolicy
+ Class<?> policyClass = ensemblePlacementPolicyConfig.getPolicyClass();
+ if
(IsolatedBookieEnsemblePlacementPolicy.class.isAssignableFrom(policyClass)) {
+ MutablePair<Set<String>, Set<String>> pair = new
MutablePair<>(emptySet(), emptySet());
Map<String, Object> properties =
ensemblePlacementPolicyConfig.getProperties();
String primaryIsolationGroupString = ConfigurationStringUtil
.castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
@@ -178,21 +181,23 @@ public class IsolatedBookieEnsemblePlacementPolicy
extends RackawareEnsemblePlac
.castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
if (!primaryIsolationGroupString.isEmpty()) {
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
- } else {
- pair.setLeft(Collections.emptySet());
}
if (!secondaryIsolationGroupString.isEmpty()) {
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
- } else {
- pair.setRight(Collections.emptySet());
}
+ return pair;
+ } else {
+ log.info()
+ .attr("policyClass",
ensemblePlacementPolicyConfig.getPolicyClass().getName())
+ .log("The ensemble placement policy class is not
compatible with "
+ + "IsolatedBookieEnsemblePlacementPolicy, fallback
to use defaultIsolationGroups");
+ return defaultIsolationGroups;
}
- return pair;
}
@VisibleForTesting
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
- Pair<Set<String>, Set<String>> isolationGroups) {
+ Pair<Set<String>,
Set<String>> isolationGroups) {
Set<BookieId> excludedBookies = new HashSet<>();
if (isolationGroups != null &&
isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
@@ -215,8 +220,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
return excludedBookies;
}
int totalAvailableBookiesInPrimaryGroup = 0;
- Set<String> primaryIsolationGroup = Collections.emptySet();
- Set<String> secondaryIsolationGroup = Collections.emptySet();
+ Set<String> primaryIsolationGroup = emptySet();
+ Set<String> secondaryIsolationGroup = emptySet();
Set<BookieId> primaryGroupBookies = new HashSet<>();
if (isolationGroups != null) {
primaryIsolationGroup = isolationGroups.getLeft();
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 7940ccc636b..cd5041de459 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -43,12 +43,14 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -58,6 +60,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -839,6 +842,139 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
assertTrue(blacklist.isEmpty());
}
+ /**
+ * Regression test for the NPE reported in the stack trace below. When
custom metadata carries an
+ * {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match
+ * {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code
getIsolationGroup()} returned a
+ * {@code MutablePair} with {@code null} left/right, which caused a {@link
NullPointerException} in
+ * {@code getExcludedBookiesWithIsolationGroups} when {@code
getLeft().contains(...)} was called.
+ *
+ * <pre>
+ * java.lang.NullPointerException: Cannot invoke
"java.util.Set.contains(Object)"
+ * because the return value of
"org.apache.commons.lang3.tuple.Pair.getLeft()" is null
+ * at
IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...)
+ * at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...)
+ * at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...)
+ * </pre>
+ */
+ @Test
+ public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE()
throws Exception {
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ Map<String, BookieInfo> group1 = new HashMap<>();
+ group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+ group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
+ group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
+ group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+ bookieMapping.put("group1", group1);
+
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new
IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
"group1");
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ // Use a policy class that does NOT match
IsolatedBookieEnsemblePlacementPolicy.
+ // In the old code this caused getIsolationGroup() to return a
MutablePair with null left/right,
+ // triggering NPE at the getLeft().contains() call in
getExcludedBookiesWithIsolationGroups.
+ EnsemblePlacementPolicyConfig policyConfig = new
EnsemblePlacementPolicyConfig(
+ RackawareEnsemblePlacementPolicy.class,
Collections.emptyMap());
+ Map<String, byte[]> customMetadata = new HashMap<>();
+
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
policyConfig.encode());
+
+ BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
+ BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
+
+ // Must not throw NullPointerException; BKNotEnoughBookiesException is
acceptable.
+ isolationPolicy.replaceBookie(2, 2, 2, customMetadata,
+ Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null);
+ }
+
+ /**
+ * Verifies that {@link
IsolatedBookieEnsemblePlacementPolicy#getIsolationGroup} treats
+ * {@link ZkIsolatedBookieEnsemblePlacementPolicy} (a subclass) exactly
like
+ * {@link IsolatedBookieEnsemblePlacementPolicy} itself when reading
isolation groups from
+ * {@link EnsemblePlacementPolicyConfig} properties.
+ *
+ * <p>Legacy Pulsar clusters may have persisted {@code
EnsemblePlacementPolicyConfig} entries whose
+ * {@code policyClass} field is set to {@code
ZkIsolatedBookieEnsemblePlacementPolicy}. The
+ * {@code isAssignableFrom} check in {@code getIsolationGroup} must
recognise this subclass so that
+ * the isolation groups are read from the stored properties rather than
falling back to the
+ * policy-level defaults.
+ */
+ @Test
+ public void testGetIsolationGroupWithZkCompatiblePolicyClass() throws
Exception {
+ // Group1 → default isolation group configured on the policy.
+ // Group2 → isolation group carried inside the custom metadata
(ZkIsolated class).
+ final String defaultGroup = "Group1";
+ final String customGroup = "Group2";
+
+ Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
+ Map<String, BookieInfo> group1 = new HashMap<>();
+ group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
+ group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
+ Map<String, BookieInfo> group2 = new HashMap<>();
+ group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
+ group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
+ bookieMapping.put(defaultGroup, group1);
+ bookieMapping.put(customGroup, group2);
+
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
+
+ IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new
IsolatedBookieEnsemblePlacementPolicy();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
defaultGroup);
+ isolationPolicy.initialize(bkClientConf, Optional.empty(), timer,
SettableFeatureProvider.DISABLE_ALL,
+ NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
+
+ // --- unit-level: getIsolationGroup should parse properties, not fall
back to defaults ---
+ Map<String, Object> props = new HashMap<>();
+
props.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
customGroup);
+
props.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"secondaryGroup");
+ EnsemblePlacementPolicyConfig zkConfig = new
EnsemblePlacementPolicyConfig(
+ ZkIsolatedBookieEnsemblePlacementPolicy.class, props);
+
+ Pair<Set<String>, Set<String>> groups =
isolationPolicy.getIsolationGroup(zkConfig);
+ assertEquals(groups.getLeft(), Sets.newHashSet(customGroup),
+ "primary group must be read from ZkIsolated config
properties");
+ assertEquals(groups.getRight(), Sets.newHashSet("secondaryGroup"),
+ "secondary group must be read from ZkIsolated config
properties");
+
+ // --- integration-level: newEnsemble must select bookies from the
ZkIsolated config group ---
+ Map<String, Object> placementPolicyProperties = new HashMap<>();
+
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
customGroup);
+
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
"");
+ EnsemblePlacementPolicyConfig policyConfig = new
EnsemblePlacementPolicyConfig(
+ ZkIsolatedBookieEnsemblePlacementPolicy.class,
placementPolicyProperties);
+ Map<String, byte[]> customMetadata = new HashMap<>();
+
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG,
policyConfig.encode());
+
+ Set<BookieId> bookieIdGroup2 = new HashSet<>();
+ bookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
+ bookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());
+
+ List<BookieId> ensemble = isolationPolicy
+ .newEnsemble(2, 2, 2, customMetadata, new
HashSet<>()).getResult();
+ assertTrue(bookieIdGroup2.containsAll(ensemble),
+ "ensemble should come from " + customGroup + " (ZkIsolated
config), got " + ensemble);
+
+ // Sanity-check: without custom metadata the default group1 bookies
are chosen.
+ Set<BookieId> bookieIdGroup1 = new HashSet<>();
+ bookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
+ bookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
+ List<BookieId> defaultEnsemble = isolationPolicy
+ .newEnsemble(2, 2, 2, Collections.emptyMap(), new
HashSet<>()).getResult();
+ assertTrue(bookieIdGroup1.containsAll(defaultEnsemble),
+ "default ensemble should come from " + defaultGroup + ", got "
+ defaultEnsemble);
+ }
+
// The policy gets the bookie info asynchronously before each query or
update, when putting the bookie info into
// the metadata store, the cache needs some time to receive the
notification and update accordingly.
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy
isolationPolicy, byte[] bookieInfo) {