This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new abb5981676 Rackaware placement policy support local node awareness by
hostname (#4057)
abb5981676 is described below
commit abb5981676a870c2d3b17b62d5b96f3a3f4a4d55
Author: Hang Chen <[email protected]>
AuthorDate: Wed Sep 20 15:14:25 2023 +0800
Rackaware placement policy support local node awareness by hostname (#4057)
### Motivation
Rack-aware placement policies enable preference for bookies that reside in
the same rack as the bookie client.
- Initiate local node by resolving the rack information
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L207-L215
- When generating new ensembles for a ledger, the selecting algorithm will
set the localNode's rack to `curRack` and select one bookie from `curRack` first
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L420-L426
However, when resolving the local node's rack information, we use IP to
resolve the rack name, which is unfriendly with k8s deployment.
https://github.com/apache/bookkeeper/blob/5b5c05331757e7356579076970e61f119f5d34ae/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L209
In k8s deployment, we usually use the hostname as bookieId and Pulsar
broker name instead of IP, because the IP will be changed when the pods
migrated to other nodes.
### Modification
In order not to bring break change to the current behavior, I introduced a
flag `useHostnameResolveLocalNodePlacementPolicy` in the BookKeeper client
configuration to control whether to use the hostname to resolve the bookie
client's local node rack information. The flag is `false` by default, which is
the same behavior as the current logic.
Due to this PR doesn't introduce any break changes, I think we can
cherry-pick it back to the patch releases (branch-4.14, branch-4.15 and
branch-4.16)
---
.../client/RackawareEnsemblePlacementPolicy.java | 30 ++++++--
.../RackawareEnsemblePlacementPolicyImpl.java | 43 ++++++++++-
.../client/RegionAwareEnsemblePlacementPolicy.java | 9 ++-
.../bookkeeper/conf/ClientConfiguration.java | 19 +++++
.../TestRackawareEnsemblePlacementPolicy.java | 84 ++++++++++++++++++++++
5 files changed, 176 insertions(+), 9 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 72858f188f..1fb17ca3ef 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -58,20 +58,40 @@ public class RackawareEnsemblePlacementPolicy extends
RackawareEnsemblePlacement
boolean
enforceMinNumRacksPerWriteQuorum,
boolean
ignoreLocalNodeInPlacementPolicy,
StatsLogger statsLogger, BookieAddressResolver
bookieAddressResolver) {
+ return initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
+ reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
minNumRacksPerWriteQuorum,
+ enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy, false,
+ statsLogger, bookieAddressResolver);
+ }
+
+ @Override
+ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping
dnsResolver,
+ HashedWheelTimer
timer,
+ boolean
reorderReadsRandom,
+ int
stabilizePeriodSeconds,
+ int
reorderThresholdPendingRequests,
+ boolean isWeighted,
+ int
maxWeightMultiple,
+ int
minNumRacksPerWriteQuorum,
+ boolean
enforceMinNumRacksPerWriteQuorum,
+ boolean
ignoreLocalNodeInPlacementPolicy,
+ boolean
useHostnameResolveLocalNodePlacementPolicy,
+ StatsLogger statsLogger, BookieAddressResolver
bookieAddressResolver) {
if (stabilizePeriodSeconds > 0) {
super.initialize(dnsResolver, timer, reorderReadsRandom, 0,
reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum,
enforceMinNumRacksPerWriteQuorum,
- ignoreLocalNodeInPlacementPolicy, statsLogger,
bookieAddressResolver);
+ ignoreLocalNodeInPlacementPolicy,
useHostnameResolveLocalNodePlacementPolicy,
+ statsLogger, bookieAddressResolver);
slave = new
RackawareEnsemblePlacementPolicyImpl(enforceDurability);
slave.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum,
- enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy, statsLogger,
- bookieAddressResolver);
+ enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy,
+ useHostnameResolveLocalNodePlacementPolicy, statsLogger,
bookieAddressResolver);
} else {
super.initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
reorderThresholdPendingRequests, isWeighted,
maxWeightMultiple, minNumRacksPerWriteQuorum,
- enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy, statsLogger,
- bookieAddressResolver);
+ enforceMinNumRacksPerWriteQuorum,
ignoreLocalNodeInPlacementPolicy,
+ useHostnameResolveLocalNodePlacementPolicy, statsLogger,
bookieAddressResolver);
slave = null;
}
return this;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 6ec9e5b158..7f219854ed 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -90,6 +90,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
protected int minNumRacksPerWriteQuorum;
protected boolean enforceMinNumRacksPerWriteQuorum;
protected boolean ignoreLocalNodeInPlacementPolicy;
+ protected boolean useHostnameResolveLocalNodePlacementPolicy;
public static final String REPP_RANDOM_READ_REORDERING =
"ensembleRandomReadReordering";
@@ -144,6 +145,41 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
topology = new NetworkTopologyImpl();
}
+ /**
+ * Initialize the policy.
+ *
+ * @param dnsResolver
+ * @param timer
+ * @param reorderReadsRandom
+ * @param stabilizePeriodSeconds
+ * @param reorderThresholdPendingRequests
+ * @param isWeighted
+ * @param maxWeightMultiple
+ * @param minNumRacksPerWriteQuorum
+ * @param enforceMinNumRacksPerWriteQuorum
+ * @param ignoreLocalNodeInPlacementPolicy
+ * @param statsLogger
+ * @param bookieAddressResolver
+ * @return initialized ensemble placement policy
+ */
+ protected RackawareEnsemblePlacementPolicyImpl
initialize(DNSToSwitchMapping dnsResolver,
+ HashedWheelTimer
timer,
+ boolean
reorderReadsRandom,
+ int
stabilizePeriodSeconds,
+ int
reorderThresholdPendingRequests,
+ boolean
isWeighted,
+ int
maxWeightMultiple,
+ int
minNumRacksPerWriteQuorum,
+ boolean
enforceMinNumRacksPerWriteQuorum,
+ boolean
ignoreLocalNodeInPlacementPolicy,
+ StatsLogger
statsLogger,
+
BookieAddressResolver bookieAddressResolver) {
+ return initialize(dnsResolver, timer, reorderReadsRandom,
stabilizePeriodSeconds,
+ reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
minNumRacksPerWriteQuorum,
+ enforceMinNumRacksPerWriteQuorum, ignoreLocalNodeInPlacementPolicy,
+ false, statsLogger, bookieAddressResolver);
+ }
+
/**
* Initialize the policy.
*
@@ -160,6 +196,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
int
minNumRacksPerWriteQuorum,
boolean
enforceMinNumRacksPerWriteQuorum,
boolean
ignoreLocalNodeInPlacementPolicy,
+ boolean
useHostnameResolveLocalNodePlacementPolicy,
StatsLogger
statsLogger,
BookieAddressResolver bookieAddressResolver) {
checkNotNull(statsLogger, "statsLogger should not be null, use
NullStatsLogger instead.");
@@ -195,6 +232,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
this.enforceMinNumRacksPerWriteQuorum =
enforceMinNumRacksPerWriteQuorum;
this.ignoreLocalNodeInPlacementPolicy =
ignoreLocalNodeInPlacementPolicy;
+ this.useHostnameResolveLocalNodePlacementPolicy =
useHostnameResolveLocalNodePlacementPolicy;
// create the network topology
if (stabilizePeriodSeconds > 0) {
@@ -206,7 +244,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
BookieNode bn = null;
if (!ignoreLocalNodeInPlacementPolicy) {
try {
- bn =
createDummyLocalBookieNode(InetAddress.getLocalHost().getHostAddress());
+ String hostname = useHostnameResolveLocalNodePlacementPolicy
+ ? InetAddress.getLocalHost().getCanonicalHostName() :
InetAddress.getLocalHost().getHostAddress();
+ bn = createDummyLocalBookieNode(hostname);
} catch (IOException e) {
LOG.error("Failed to get local host address : ", e);
}
@@ -303,6 +343,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends
TopologyAwareEnsembleP
conf.getMinNumRacksPerWriteQuorum(),
conf.getEnforceMinNumRacksPerWriteQuorum(),
conf.getIgnoreLocalNodeInPlacementPolicy(),
+ conf.getUseHostnameResolveLocalNodePlacementPolicy(),
statsLogger,
bookieAddressResolver);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 43969b8fde..5fcfd0f94f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -140,7 +140,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum,
this.enforceMinNumRacksPerWriteQuorum,
- this.ignoreLocalNodeInPlacementPolicy,
statsLogger, bookieAddressResolver)
+ this.ignoreLocalNodeInPlacementPolicy,
+
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger,
bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
@@ -201,7 +202,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests,
this.isWeighted,
this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
-
this.ignoreLocalNodeInPlacementPolicy, statsLogger,
+
this.ignoreLocalNodeInPlacementPolicy,
+
this.useHostnameResolveLocalNodePlacementPolicy, statsLogger,
bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
perRegionPlacement.put(newRegion,
newRegionPlacement);
@@ -242,7 +244,8 @@ public class RegionAwareEnsemblePlacementPolicy extends
RackawareEnsemblePlaceme
.initialize(dnsResolver, timer,
this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum,
this.enforceMinNumRacksPerWriteQuorum,
- this.ignoreLocalNodeInPlacementPolicy,
statsLogger, bookieAddressResolver)
+ this.ignoreLocalNodeInPlacementPolicy,
this.ignoreLocalNodeInPlacementPolicy,
+ statsLogger, bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
minRegionsForDurability =
conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 8aaa8bbff4..297a2f62f4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -161,6 +161,9 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES
=
"ensemblePlacementPolicyOrderSlowBookies";
protected static final String BOOKIE_ADDRESS_RESOLVER_ENABLED =
"bookieAddressResolverEnabled";
+ // Use hostname to resolve local placement info
+ public static final String
USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY =
+ "useHostnameResolveLocalNodePlacementPolicy";
// Stats
protected static final String ENABLE_TASK_EXECUTION_STATS =
"enableTaskExecutionStats";
@@ -1314,6 +1317,22 @@ public class ClientConfiguration extends
AbstractConfiguration<ClientConfigurati
return this;
}
+ /**
+ * Set the flag to use hostname to resolve local node placement policy.
+ * @param useHostnameResolveLocalNodePlacementPolicy
+ */
+ public void setUseHostnameResolveLocalNodePlacementPolicy(boolean
useHostnameResolveLocalNodePlacementPolicy) {
+ setProperty(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY,
useHostnameResolveLocalNodePlacementPolicy);
+ }
+
+ /**
+ * Get whether to use hostname to resolve local node placement policy.
+ * @return
+ */
+ public boolean getUseHostnameResolveLocalNodePlacementPolicy() {
+ return getBoolean(USE_HOSTNAME_RESOLVE_LOCAL_NODE_PLACEMENT_POLICY,
false);
+ }
+
/**
* Whether to enable recording task execution stats.
*
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 95a7d5b40d..ed37159ee1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -1689,6 +1689,90 @@ public class TestRackawareEnsemblePlacementPolicy
extends TestCase {
}
}
+ @Test
+ public void testNewEnsemblePickLocalRackBookiesByHostname() throws
Exception {
+ testNewEnsemblePickLocalRackBookiesInternal(true);
+ }
+
+ @Test
+ public void testNewEnsemblePickLocalRackBookiesByIP() throws Exception {
+ testNewEnsemblePickLocalRackBookiesInternal(false);
+ }
+
+ public void testNewEnsemblePickLocalRackBookiesInternal(boolean
useHostnameResolveLocalNodePlacementPolicy)
+ throws Exception {
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
+ BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
+
+ // update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(),
"/default-region/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(),
"/default-region/r2");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(),
"/default-region/r3");
+ StaticDNSResolver.addNodeToRack(addr6.getHostName(),
"/default-region/r4");
+ StaticDNSResolver.addNodeToRack(addr7.getHostName(),
"/default-region/r5");
+
+ String hostname = useHostnameResolveLocalNodePlacementPolicy
+ ? InetAddress.getLocalHost().getCanonicalHostName() :
InetAddress.getLocalHost().getHostAddress();
+ StaticDNSResolver.addNodeToRack(hostname, "/default-region/r1");
+ if (useHostnameResolveLocalNodePlacementPolicy) {
+
conf.setUseHostnameResolveLocalNodePlacementPolicy(useHostnameResolveLocalNodePlacementPolicy);
+ }
+
+ repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
+ DISABLE_ALL, NullStatsLogger.INSTANCE,
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<BookieId>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+ addrs.add(addr6.toBookieId());
+ addrs.add(addr7.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+
+ int ensembleSize = 3;
+ int writeQuorumSize = 3;
+ int ackQuorumSize = 2;
+
+ Set<BookieId> excludeBookies = new HashSet<>();
+
+ for (int i = 0; i < 50000; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ if (!ensemble.contains(addr1.toBookieId())) {
+ fail("Failed to select bookie located on the same rack with
bookie client");
+ }
+ if (ensemble.contains(addr2.toBookieId()) &&
ensemble.contains(addr3.toBookieId())) {
+ fail("addr2 and addr3 is same rack.");
+ }
+ }
+
+ //addr4 shutdown.
+ addrs.remove(addr5.toBookieId());
+ repp.onClusterChanged(addrs, new HashSet<BookieId>());
+ for (int i = 0; i < 50000; ++i) {
+ EnsemblePlacementPolicy.PlacementResult<List<BookieId>>
ensembleResponse =
+ repp.newEnsemble(ensembleSize, writeQuorumSize,
+ ackQuorumSize, null, excludeBookies);
+ List<BookieId> ensemble = ensembleResponse.getResult();
+ if (!ensemble.contains(addr1.toBookieId())) {
+ fail("Failed to select bookie located on the same rack with
bookie client");
+ }
+ }
+
+ }
+
@Test
public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
int numOfRacksToCreate = 6;