This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ed26084a9748086d75f6b4166f21d2093edc8e9c Author: Malla Sandeep <[email protected]> AuthorDate: Fri Dec 12 11:25:53 2025 +0530 [fix][broker] Force EnsemblePolicies to resolve network location after rackInfoMap is updated due to changes in /ledgers/available znode (#25067) (cherry picked from commit 76a2394022b0f02ec0cbf823f72656826174b265) --- .../rackawareness/BookieRackAffinityMapping.java | 26 +++- .../BookieRackAffinityMappingTest.java | 143 ++++++++++++++++++--- 2 files changed, 146 insertions(+), 23 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 2df4dc22c14..d1662100e3e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METAD import java.lang.reflect.Field; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -64,7 +65,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE"; private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null; - private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null; + private volatile ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null; private List<BookieId> bookieAddressListLastTime = new ArrayList<>(); private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); @@ -157,7 +158,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping registrationClient.watchWritableBookies(versioned -> { bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) .thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) - .thenAccept(this::updateRacksWithHost) + .thenApply(this::processRackUpdate) .exceptionally(ex -> { LOG.error("Failed to update rack info. ", ex); return null; @@ -169,6 +170,17 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } } + private Void processRackUpdate(BookiesRackConfiguration racks) { + ArrayList<BookieId> bookieIdSet; + synchronized (this) { + updateRacksWithHost(racks); + bookieIdSet = new ArrayList<>(bookieAddressListLastTime); + } + // Notify ensemble placement policy after rack info is updated to ensure consistent state. + rackChangeListenerCallback(bookieIdSet); + return null; + } + private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) { // In config z-node, the bookies are added in the `ip:port` notation, while BK will ask // for just the IP/hostname when trying to get the rack for a bookie. @@ -274,12 +286,16 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping bookieIdSet.addAll(bookieAddressListLastTime); bookieAddressListLastTime = bookieAddressList; } - if (rackawarePolicy != null) { - rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet)); - } + rackChangeListenerCallback(bookieIdSet); }); } + private void rackChangeListenerCallback(Collection<BookieId> bookieIdSet) { + if (rackawarePolicy != null) { + rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet)); + } + } + @Override public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy) { this.rackawarePolicy = rackawarePolicy; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index 46466dc6f97..a8f39917869 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -19,6 +19,9 @@ package org.apache.pulsar.bookie.rackawareness; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; +import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -37,6 +40,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,9 +60,12 @@ import org.apache.bookkeeper.net.NetworkTopology; import org.apache.bookkeeper.proto.BookieAddressResolver; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; @@ -98,7 +105,7 @@ public class BookieRackAffinityMappingTest { String data = "{\"group1\": {\"" + bookie1 + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + bookie2 + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); // Case1: ZKCache is given BookieRackAffinityMapping mapping = new BookieRackAffinityMapping(); @@ -133,7 +140,7 @@ public class BookieRackAffinityMappingTest { + "\": {\"rack\": \"/\", \"hostname\": \"bookie1.example.com\"}, \"" + bookie2 + "\": {\"rack\": \"\", \"hostname\": \"bookie2.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); // Case1: ZKCache is given BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping(); @@ -171,7 +178,7 @@ public class BookieRackAffinityMappingTest { bookieMapping.put("group1", mainBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + store.put(BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), Optional.empty()).join(); Awaitility.await().untilAsserted(() -> { @@ -193,7 +200,7 @@ public class BookieRackAffinityMappingTest { bookieMapping.put("group1", mainBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + store.put(BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), Optional.empty()).join(); BookieRackAffinityMapping mapping = new BookieRackAffinityMapping(); @@ -213,7 +220,7 @@ public class BookieRackAffinityMappingTest { secondaryBookieGroup.put(bookie3, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + store.put(BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), Optional.empty()).join(); Awaitility.await().untilAsserted(() -> { List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); @@ -221,7 +228,7 @@ public class BookieRackAffinityMappingTest { assertEquals(r.get(1), "/rack1"); assertEquals(r.get(2), "/rack0"); }); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), + store.put(BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), Optional.empty()).join(); Awaitility.await().untilAsserted(() -> { @@ -237,7 +244,7 @@ public class BookieRackAffinityMappingTest { String data = "{\"group1\": {\"" + bookie1 + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + bookie2 + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); // Case1: ZKCache is given BookieRackAffinityMapping mapping = new BookieRackAffinityMapping(); @@ -260,11 +267,7 @@ public class BookieRackAffinityMappingTest { assertEquals(racks.size(), 0); @Cleanup("stop") - HashedWheelTimer timer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), - bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - bkClientConf.getTimeoutTimerNumTicks()); - + HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf); RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); mapping.registerRackChangeListener(repp); Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy"); @@ -342,7 +345,7 @@ public class BookieRackAffinityMappingTest { //remove bookie2 rack, the bookie2 rack should be /default-rack data = "{\"group1\": {\"" + bookie1 + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + store.put(BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); Awaitility.await().atMost(30, TimeUnit.SECONDS).until( () -> ((BookiesRackConfiguration) field.get(mapping)).get("group1").size() == 1); @@ -367,10 +370,7 @@ public class BookieRackAffinityMappingTest { mapping.setConf(bkClientConf); @Cleanup("stop") - HashedWheelTimer timer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), - bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - bkClientConf.getTimeoutTimerNumTicks()); - + HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf); RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); repp.initialize(bkClientConf, Optional.of(mapping), timer, DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); @@ -391,7 +391,7 @@ public class BookieRackAffinityMappingTest { BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates", Notification.class); handleUpdates.setAccessible(true); Notification n = - new Notification(NotificationType.Modified, BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH); + new Notification(NotificationType.Modified, BOOKIE_INFO_ROOT_PATH); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < 2_000) { handleUpdates.invoke(mapping, n); @@ -415,4 +415,111 @@ public class BookieRackAffinityMappingTest { assertTrue(count.await(3, TimeUnit.SECONDS)); } + + @Test + public void testZKEventListenersOrdering() throws Exception { + @Cleanup + PulsarRegistrationClient pulsarRegistrationClient = + new PulsarRegistrationClient(store, "/ledgers"); + DefaultBookieAddressResolver defaultBookieAddressResolver = + new DefaultBookieAddressResolver(pulsarRegistrationClient); + // Create and configure the mapping + BookieRackAffinityMapping mapping = new BookieRackAffinityMapping(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); + mapping.setBookieAddressResolver(defaultBookieAddressResolver); + mapping.setConf(bkClientConf); + + // Create RackawareEnsemblePlacementPolicy and initialize it + @Cleanup("stop") + HashedWheelTimer timer = getTestHashedWheelTimer(bkClientConf); + RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); + repp.initialize(bkClientConf, Optional.of(mapping), timer, + DISABLE_ALL, NullStatsLogger.INSTANCE, defaultBookieAddressResolver); + mapping.registerRackChangeListener(repp); + + // Create a BookieWatcherImpl instance via reflection + Class<?> watcherClazz = Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl"); + Constructor<?> constructor = watcherClazz.getDeclaredConstructor( + ClientConfiguration.class, + EnsemblePlacementPolicy.class, + RegistrationClient.class, + BookieAddressResolver.class, + StatsLogger.class); + constructor.setAccessible(true); + Object watcher = constructor.newInstance( + bkClientConf, + repp, + pulsarRegistrationClient, + defaultBookieAddressResolver, + NullStatsLogger.INSTANCE + ); + Method initMethod = watcherClazz.getDeclaredMethod("initialBlockingBookieRead"); + initMethod.setAccessible(true); + initMethod.invoke(watcher); + + // Prepare a BookiesRackConfiguration that maps bookie1 -> /rack0 + BookieInfo bi = BookieInfo.builder().rack("/rack0").build(); + BookiesRackConfiguration racks = new BookiesRackConfiguration(); + racks.updateBookie("group1", bookie1.toString(), bi); + + // Create a mock cache for racks /bookies + MetadataCache<BookiesRackConfiguration> mockCache = mock(MetadataCache.class); + Field f = BookieRackAffinityMapping.class.getDeclaredField("bookieMappingCache"); + f.setAccessible(true); + f.set(mapping, mockCache); + when(mockCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)) + .thenReturn(CompletableFuture.completedFuture(Optional.of(racks))); + + // Inject the bookie address list into BookieRackAffinityMapping + Field addressListField = BookieRackAffinityMapping.class.getDeclaredField("bookieAddressListLastTime"); + addressListField.setAccessible(true); + addressListField.set(mapping, List.of(bookie1.toBookieId())); + + // Inject the writable bookie into PulsarRegistrationClient + Field writableField = PulsarRegistrationClient.class.getDeclaredField("writableBookieInfo"); + writableField.setAccessible(true); + @SuppressWarnings("unchecked") + Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo = + (Map<BookieId, Versioned<BookieServiceInfo>>) writableField.get(pulsarRegistrationClient); + writableBookieInfo.put( + bookie1.toBookieId(), + new Versioned<>(BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookie1.toString()), Version.NEW) + ); + + // watcher.processWritableBookiesChanged runs FIRST triggering RackAware ensemble policy listener → incorrect + // ordering + Method procMethod = + watcherClazz.getDeclaredMethod("processWritableBookiesChanged", java.util.Set.class); + procMethod.setAccessible(true); + Set<BookieId> ids = new HashSet<>(); + ids.add(bookie1.toBookieId()); + procMethod.invoke(watcher, ids); + + // BookieRackAffinityMapping rack mapping update runs SECOND → delayed rack info + Method processRackUpdateMethod = BookieRackAffinityMapping.class.getDeclaredMethod("processRackUpdate", + BookiesRackConfiguration.class); + processRackUpdateMethod.setAccessible(true); + processRackUpdateMethod.invoke(mapping, racks); + + // ------------------- + // NOW CHECK REPP INTERNAL STATE + // ------------------- + // BookieNode.getNetworkLocation() + Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy"); + Field field1 = clazz1.getDeclaredField("knownBookies"); + field1.setAccessible(true); + Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>) field1.get(repp); + BookieNode bn = knownBookies.get(bookie1.toBookieId()); + // Rack info update is delayed but because of new callback the rack info on ensemble policy should be updated. + assertEquals(bn.getNetworkLocation(), "/rack0", + "Network location should match /rack0 on bookie"); + } + + private static HashedWheelTimer getTestHashedWheelTimer(ClientConfiguration bkClientConf) { + return new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(), + bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, + bkClientConf.getTimeoutTimerNumTicks()); + } }
