This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bf361fc4417 [fix][broker] Fix the deadlock when using
BookieRackAffinityMapping with rackaware policy (#21481)
bf361fc4417 is described below
commit bf361fc4417700d1ef36a6c53d3433a4ab6c2f12
Author: erobot <[email protected]>
AuthorDate: Fri Nov 10 18:54:01 2023 +0800
[fix][broker] Fix the deadlock when using BookieRackAffinityMapping with
rackaware policy (#21481)
---
.../rackawareness/BookieRackAffinityMapping.java | 9 +--
.../BookieRackAffinityMappingTest.java | 68 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index d54ef2a5f4c..983822f2294 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -245,6 +245,7 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenAccept(optVal -> {
+ Set<BookieId> bookieIdSet = new HashSet<>();
synchronized (this) {
LOG.info("Bookie rack info updated to {}. Notifying
rackaware policy.", optVal);
this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
@@ -259,12 +260,12 @@ public class BookieRackAffinityMapping extends
AbstractDNSToSwitchMapping
LOG.debug("Bookies with rack update from {} to
{}", bookieAddressListLastTime,
bookieAddressList);
}
- Set<BookieId> bookieIdSet = new
HashSet<>(bookieAddressList);
+ bookieIdSet.addAll(bookieAddressList);
bookieIdSet.addAll(bookieAddressListLastTime);
bookieAddressListLastTime = bookieAddressList;
- if (rackawarePolicy != null) {
- rackawarePolicy.onBookieRackChange(new
ArrayList<>(bookieIdSet));
- }
+ }
+ if (rackawarePolicy != null) {
+ rackawarePolicy.onBookieRackChange(new
ArrayList<>(bookieIdSet));
}
});
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index d7df5afb4be..9cd81604442 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.bookie.rackawareness;
import static
org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -28,6 +29,7 @@ import io.netty.util.HashedWheelTimer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,7 +37,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -46,6 +52,7 @@ import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -55,6 +62,8 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationClient;
import org.awaitility.Awaitility;
@@ -342,4 +351,63 @@ public class BookieRackAffinityMappingTest {
timer.stop();
}
+
+ @Test
+ public void testNoDeadlockWithRackawarePolicy() throws Exception {
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE,
store);
+
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ mapping.setConf(bkClientConf);
+
+ @Cleanup("stop")
+ HashedWheelTimer timer = new HashedWheelTimer(new
ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+ bkClientConf.getTimeoutTimerTickDurationMs(),
TimeUnit.MILLISECONDS,
+ bkClientConf.getTimeoutTimerNumTicks());
+
+ RackawareEnsemblePlacementPolicy repp = new
RackawareEnsemblePlacementPolicy();
+ repp.initialize(bkClientConf, Optional.of(mapping), timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ mapping.registerRackChangeListener(repp);
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor1 = Executors.newSingleThreadExecutor();
+ @Cleanup("shutdownNow")
+ ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+ CountDownLatch count = new CountDownLatch(2);
+
+ executor1.submit(() -> {
+ try {
+ Method handleUpdates =
+
BookieRackAffinityMapping.class.getDeclaredMethod("handleUpdates",
Notification.class);
+ handleUpdates.setAccessible(true);
+ Notification n =
+ new Notification(NotificationType.Modified,
BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2_000) {
+ handleUpdates.invoke(mapping, n);
+ }
+ count.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ executor2.submit(() -> {
+ Set<BookieId> writableBookies = new HashSet<>();
+ writableBookies.add(BOOKIE1.toBookieId());
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2_000) {
+ repp.onClusterChanged(writableBookies, Collections.emptySet());
+ repp.onClusterChanged(Collections.emptySet(),
Collections.emptySet());
+ }
+ count.countDown();
+ });
+
+ assertTrue(count.await(3, TimeUnit.SECONDS));
+ }
}