This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76a2394022b [fix][broker] Force EnsemblePolicies to resolve network
location after rackInfoMap is updated due to changes in /ledgers/available
znode (#25067)
76a2394022b is described below
commit 76a2394022b0f02ec0cbf823f72656826174b265
Author: Malla Sandeep <[email protected]>
AuthorDate: Fri Dec 12 11:25:53 2025 +0530
[fix][broker] Force EnsemblePolicies to resolve network location after
rackInfoMap is updated due to changes in /ledgers/available znode (#25067)
---
.../rackawareness/BookieRackAffinityMapping.java | 26 +++-
.../BookieRackAffinityMappingTest.java | 143 ++++++++++++++++++---
2 files changed, 146 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index 2df4dc22c14..d1662100e3e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METAD
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -64,7 +65,7 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
public static final String METADATA_STORE_INSTANCE =
"METADATA_STORE_INSTANCE";
private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
- private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy
= null;
+ private volatile ITopologyAwareEnsemblePlacementPolicy<BookieNode>
rackawarePolicy = null;
private List<BookieId> bookieAddressListLastTime = new ArrayList<>();
private BookiesRackConfiguration racksWithHost = new
BookiesRackConfiguration();
@@ -157,7 +158,7 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
registrationClient.watchWritableBookies(versioned -> {
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes ->
optRes.orElseGet(BookiesRackConfiguration::new))
- .thenAccept(this::updateRacksWithHost)
+ .thenApply(this::processRackUpdate)
.exceptionally(ex -> {
LOG.error("Failed to update rack info. ", ex);
return null;
@@ -169,6 +170,17 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
}
}
+ private Void processRackUpdate(BookiesRackConfiguration racks) {
+ ArrayList<BookieId> bookieIdSet;
+ synchronized (this) {
+ updateRacksWithHost(racks);
+ bookieIdSet = new ArrayList<>(bookieAddressListLastTime);
+ }
+ // Notify ensemble placement policy after rack info is updated to
ensure consistent state.
+ rackChangeListenerCallback(bookieIdSet);
+ return null;
+ }
+
private synchronized void updateRacksWithHost(BookiesRackConfiguration
racks) {
// In config z-node, the bookies are added in the `ip:port` notation,
while BK will ask
// for just the IP/hostname when trying to get the rack for a bookie.
@@ -274,12 +286,16 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
bookieIdSet.addAll(bookieAddressListLastTime);
bookieAddressListLastTime = bookieAddressList;
}
- if (rackawarePolicy != null) {
- rackawarePolicy.onBookieRackChange(new
ArrayList<>(bookieIdSet));
- }
+ rackChangeListenerCallback(bookieIdSet);
});
}
+ private void rackChangeListenerCallback(Collection<BookieId> bookieIdSet) {
+ if (rackawarePolicy != null) {
+ rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
+ }
+ }
+
@Override
public void
registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode>
rackawarePolicy) {
this.rackawarePolicy = rackawarePolicy;
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 46466dc6f97..a8f39917869 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.bookie.rackawareness;
import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
+import static
org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -37,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -56,9 +60,12 @@ import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -98,7 +105,7 @@ public class BookieRackAffinityMappingTest {
String data = "{\"group1\": {\"" + bookie1
+ "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + bookie2
+ "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+ store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(),
Optional.empty()).join();
// Case1: ZKCache is given
BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -133,7 +140,7 @@ public class BookieRackAffinityMappingTest {
+ "\": {\"rack\": \"/\", \"hostname\":
\"bookie1.example.com\"}, \"" + bookie2
+ "\": {\"rack\": \"\", \"hostname\":
\"bookie2.example.com\"}}}";
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+ store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(),
Optional.empty()).join();
// Case1: ZKCache is given
BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping();
@@ -171,7 +178,7 @@ public class BookieRackAffinityMappingTest {
bookieMapping.put("group1", mainBookieGroup);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ store.put(BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
Awaitility.await().untilAsserted(() -> {
@@ -193,7 +200,7 @@ public class BookieRackAffinityMappingTest {
bookieMapping.put("group1", mainBookieGroup);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ store.put(BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -213,7 +220,7 @@ public class BookieRackAffinityMappingTest {
secondaryBookieGroup.put(bookie3,
BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ store.put(BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
Awaitility.await().untilAsserted(() -> {
List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1",
"127.0.0.2", "127.0.0.3"));
@@ -221,7 +228,7 @@ public class BookieRackAffinityMappingTest {
assertEquals(r.get(1), "/rack1");
assertEquals(r.get(2), "/rack0");
});
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(),
+ store.put(BOOKIE_INFO_ROOT_PATH, "{}".getBytes(),
Optional.empty()).join();
Awaitility.await().untilAsserted(() -> {
@@ -237,7 +244,7 @@ public class BookieRackAffinityMappingTest {
String data = "{\"group1\": {\"" + bookie1
+ "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}, \"" + bookie2
+ "\": {\"rack\": \"/rack1\", \"hostname\":
\"bookie2.example.com\"}}}";
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+ store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(),
Optional.empty()).join();
// Case1: ZKCache is given
BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
@@ -260,11 +267,7 @@ public class BookieRackAffinityMappingTest {
assertEquals(racks.size(), 0);
@Cleanup("stop")
- HashedWheelTimer timer = new HashedWheelTimer(
- new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
- bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
- bkClientConf.getTimeoutTimerNumTicks());
-
+ HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
mapping.registerRackChangeListener(repp);
Class<?> clazz1 =
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
@@ -342,7 +345,7 @@ public class BookieRackAffinityMappingTest {
//remove bookie2 rack, the bookie2 rack should be /default-rack
data = "{\"group1\": {\"" + bookie1
+ "\": {\"rack\": \"/rack0\", \"hostname\":
\"bookie1.example.com\"}}}";
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(), Optional.empty()).join();
+ store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(),
Optional.empty()).join();
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(
() -> ((BookiesRackConfiguration)
field.get(mapping)).get("group1").size() == 1);
@@ -367,10 +370,7 @@ public class BookieRackAffinityMappingTest {
mapping.setConf(bkClientConf);
@Cleanup("stop")
- HashedWheelTimer timer = new HashedWheelTimer(new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
- bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
- bkClientConf.getTimeoutTimerNumTicks());
-
+ HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
repp.initialize(bkClientConf, Optional.of(mapping), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
@@ -391,7 +391,7 @@ public class BookieRackAffinityMappingTest {
BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates",
Notification.class);
handleUpdates.setAccessible(true);
Notification n =
- new Notification(NotificationType.Modified,
BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+ new Notification(NotificationType.Modified,
BOOKIE_INFO_ROOT_PATH);
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < 2_000) {
handleUpdates.invoke(mapping, n);
@@ -415,4 +415,111 @@ public class BookieRackAffinityMappingTest {
assertTrue(count.await(3, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testZKEventListenersOrdering() throws Exception {
+ @Cleanup
+ PulsarRegistrationClient pulsarRegistrationClient =
+ new PulsarRegistrationClient(store, "/ledgers");
+ DefaultBookieAddressResolver defaultBookieAddressResolver =
+ new DefaultBookieAddressResolver(pulsarRegistrationClient);
+ // Create and configure the mapping
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+ mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+ mapping.setConf(bkClientConf);
+
+ // Create RackawareEnsemblePlacementPolicy and initialize it
+ @Cleanup("stop")
+ HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf);
+ RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
+ repp.initialize(bkClientConf, Optional.of(mapping), timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
defaultBookieAddressResolver);
+ mapping.registerRackChangeListener(repp);
+
+ // Create a BookieWatcherImpl instance via reflection
+ Class<?> watcherClazz =
Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+ Constructor<?> constructor = watcherClazz.getDeclaredConstructor(
+ ClientConfiguration.class,
+ EnsemblePlacementPolicy.class,
+ RegistrationClient.class,
+ BookieAddressResolver.class,
+ StatsLogger.class);
+ constructor.setAccessible(true);
+ Object watcher = constructor.newInstance(
+ bkClientConf,
+ repp,
+ pulsarRegistrationClient,
+ defaultBookieAddressResolver,
+ NullStatsLogger.INSTANCE
+ );
+ Method initMethod =
watcherClazz.getDeclaredMethod("initialBlockingBookieRead");
+ initMethod.setAccessible(true);
+ initMethod.invoke(watcher);
+
+ // Prepare a BookiesRackConfiguration that maps bookie1 -> /rack0
+ BookieInfo bi = BookieInfo.builder().rack("/rack0").build();
+ BookiesRackConfiguration racks = new BookiesRackConfiguration();
+ racks.updateBookie("group1", bookie1.toString(), bi);
+
+ // Create a mock cache for racks /bookies
+ MetadataCache<BookiesRackConfiguration> mockCache =
mock(MetadataCache.class);
+ Field f =
BookieRackAffinityMapping.class.getDeclaredField("bookieMappingCache");
+ f.setAccessible(true);
+ f.set(mapping, mockCache);
+ when(mockCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH))
+
.thenReturn(CompletableFuture.completedFuture(Optional.of(racks)));
+
+ // Inject the bookie address list into BookieRackAffinityMapping
+ Field addressListField =
BookieRackAffinityMapping.class.getDeclaredField("bookieAddressListLastTime");
+ addressListField.setAccessible(true);
+ addressListField.set(mapping, List.of(bookie1.toBookieId()));
+
+ // Inject the writable bookie into PulsarRegistrationClient
+ Field writableField =
PulsarRegistrationClient.class.getDeclaredField("writableBookieInfo");
+ writableField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo =
+ (Map<BookieId, Versioned<BookieServiceInfo>>)
writableField.get(pulsarRegistrationClient);
+ writableBookieInfo.put(
+ bookie1.toBookieId(),
+ new
Versioned<>(BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookie1.toString()),
Version.NEW)
+ );
+
+ // watcher.processWritableBookiesChanged runs FIRST triggering
RackAware ensemble policy listener → incorrect
+ // ordering
+ Method procMethod =
+
watcherClazz.getDeclaredMethod("processWritableBookiesChanged",
java.util.Set.class);
+ procMethod.setAccessible(true);
+ Set<BookieId> ids = new HashSet<>();
+ ids.add(bookie1.toBookieId());
+ procMethod.invoke(watcher, ids);
+
+ // BookieRackAffinityMapping rack mapping update runs SECOND → delayed
rack info
+ Method processRackUpdateMethod =
BookieRackAffinityMapping.class.getDeclaredMethod("processRackUpdate",
+ BookiesRackConfiguration.class);
+ processRackUpdateMethod.setAccessible(true);
+ processRackUpdateMethod.invoke(mapping, racks);
+
+ // -------------------
+ // NOW CHECK REPP INTERNAL STATE
+ // -------------------
+ // BookieNode.getNetworkLocation()
+ Class<?> clazz1 =
Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+ Field field1 = clazz1.getDeclaredField("knownBookies");
+ field1.setAccessible(true);
+ Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>)
field1.get(repp);
+ BookieNode bn = knownBookies.get(bookie1.toBookieId());
+ // Rack info update is delayed but because of new callback the rack
info on ensemble policy should be updated.
+ assertEquals(bn.getNetworkLocation(), "/rack0",
+ "Network location should match /rack0 on bookie");
+ }
+
+ private static HashedWheelTimer
getTestHashedWheelTimer(ClientConfiguration bkClientConf) {
+ return new HashedWheelTimer(
+ new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
+ bkClientConf.getTimeoutTimerNumTicks());
+ }
}