This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 84529a857a28ff6ee3c59437ce8f5b11a55ae56d Author: Hang Chen <[email protected]> AuthorDate: Mon Jan 17 10:21:31 2022 +0800 fix region aware placement policy use disk weight not work (#2981) ### Motivation When we meet the following conditions: 1. configured region aware placement policy 2. enable disk weight based placement 3. fallback random selection when selecting ensemble bookies, such as: - not enough regions - rack number less than 2 in one region It will throw the following exception, and create ledger failed. ``` 12:15:36.459 [bookkeeper-ml-scheduler-OrderedScheduler-1-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/test_v2] Encountered unexpected error when creating ledger java.lang.NullPointerException: null at org.apache.bookkeeper.client.WeightedRandomSelectionImpl.getNextRandom(WeightedRandomSelectionImpl.java:150) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandomInternal(RackawareEnsemblePlacementPolicyImpl.java:748) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.selectRandom(RackawareEnsemblePlacementPolicyImpl.java:698) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsembleInternal(RackawareEnsemblePlacementPolicyImpl.java:409) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.newEnsemble(RackawareEnsemblePlacementPolicyImpl.java:372) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.newEnsemble(RackawareEnsemblePlacementPolicy.java:159) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.newEnsemble(RegionAwareEnsemblePlacementPolicy.java:303) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookieWatcherImpl.newEnsemble(BookieWatcherImpl.java:270) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.LedgerCreateOp.initiate(LedgerCreateOp.java:161) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.client.BookKeeper.asyncCreateLedger(BookKeeper.java:860) ~[io.streamnative-bookkeeper-server-4.14.3.1.jar:4.14.3.1] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncCreateLedger(ManagedLedgerImpl.java:3657) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.initializeBookKeeper(ManagedLedgerImpl.java:460) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.access$400(ManagedLedgerImpl.java:141) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:396) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$1.operationComplete(ManagedLedgerImpl.java:328) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$getManagedLedgerInfo$2(MetaStoreImpl.java:97) ~[io.streamnative-managed-ledger-2.8.1.30.jar:2.8.1.30] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [io.streamnative-bookkeeper-common-4.14.3.1.jar:4.14.3.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ``` The root cause of this case it that in `selectRandomInternal`, the `wRselection` haven't ever update any bookie map and the filed `randomMax` and `cummulativeMap` doesn't initialized. ### Modification 1. update the `wRSelection`'s map on `selectRandomInternal` method whenever the `wRSelection` have ever set or not. (cherry picked from commit 002725ea5a25073de190bfe29f622d85fa264f0a) --- .../RackawareEnsemblePlacementPolicyImpl.java | 25 ++++++++-------- .../client/WeightedRandomSelectionImpl.java | 2 -- .../TestRegionAwareEnsemblePlacementPolicy.java | 35 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java index a4e3c8024..b561b9fe0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java @@ -715,20 +715,21 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP throw new BKNotEnoughBookiesException(); } if (wRSelection == null) { - Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>(); - for (BookieNode n : bookiesToSelectFrom) { - if (excludeBookies.contains(n)) { - continue; - } - if (this.bookieInfoMap.containsKey(n)) { - rackMap.put(n, this.bookieInfoMap.get(n)); - } else { - rackMap.put(n, new BookieInfo()); - } - } wRSelection = new WeightedRandomSelectionImpl<BookieNode>(this.maxWeightMultiple); - wRSelection.updateMap(rackMap); } + + Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>(); + for (BookieNode n : bookiesToSelectFrom) { + if (excludeBookies.contains(n)) { + continue; + } + if (this.bookieInfoMap.containsKey(n)) { + rackMap.put(n, this.bookieInfoMap.get(n)); + } else { + rackMap.put(n, new BookieInfo()); + } + } + wRSelection.updateMap(rackMap); } else { Collections.shuffle(bookiesToSelectFrom); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java index 40ad35e5c..8c2f1e2a3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelectionImpl.java @@ -150,8 +150,6 @@ class WeightedRandomSelectionImpl<T> implements WeightedRandomSelection<T> { Double randomNum = randomMax * Math.random(); // find the nearest key in the map corresponding to the randomNum Double key = cummulativeMap.floorKey(randomNum); - //LOG.info("Random max: {} CummulativeMap size: {} selected key: {}", randomMax, cummulativeMap.size(), - // key); return cummulativeMap.get(key); } finally { rwLock.readLock().unlock(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index 39649a66c..fc4e02cae 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -1572,4 +1572,39 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase { fail("Should not get not enough bookies exception even there is only one rack."); } } + + public void testRegionsWithDiskWeight() throws Exception { + repp.uninitalize(); + repp = new RegionAwareEnsemblePlacementPolicy(); + conf.setProperty(REPP_ENABLE_VALIDATION, false); + conf.setDiskWeightBasedPlacementEnabled(true); + repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL, + NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181); + BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r3"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region3/r11"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region4/r13"); + StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region5/r23"); + // Update cluster + Set<BookieId> addrs = new HashSet<BookieId>(); + addrs.add(addr1.toBookieId()); + addrs.add(addr2.toBookieId()); + addrs.add(addr3.toBookieId()); + addrs.add(addr4.toBookieId()); + addrs.add(addr5.toBookieId()); + + repp.onClusterChanged(addrs, new HashSet<BookieId>()); + + List<BookieId> ensemble = repp.newEnsemble(3, 3, 2, null, + new HashSet<>()).getResult(); + + assertEquals(3, ensemble.size()); + } }
