ndimiduk commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r466021691
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
+ // RPC has failed, trigger a refresh of master end points. We can have
some spurious
Review comment:
It seems like something we want to ensure is well-throttled, so that
100's of clients are not overwhelming 3 masters while they're trying to manage
cluster recovery. I've recently been investigating an event where we have the
rs hosting meta get overwhelmed by client requests, while the master is
concurrently trying to process server crash procedures, thus the idea of the
stampeding herd is fresh in my mind. Our IPC QoS story isn't great right now,
so I want to think carefully about how failure and recovery scenarios play out
with ratios of client:server that are on the order of 100:1.
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1,
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
- int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+ rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id
yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the
authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
- Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+ // Generate the seed list of master stubs. Subsequent RPCs try to keep a
live list of masters
+ // by fetching the end points from this list.
+ populateMasterStubs(parseMasterAddrs(conf));
+ Runnable masterEndPointRefresher = () -> {
+ while (!Thread.interrupted()) {
+ try {
+ // Spurious wake ups are okay, worst case we make an extra RPC call
to refresh. We won't
+ // have duplicate refreshes because once the thread is past the
wait(), notify()s are
+ // ignored until the thread is back to the waiting state.
+ synchronized (refreshMasters) {
+ refreshMasters.wait(WAIT_TIME_OUT_MS);
+ }
+ LOG.debug("Attempting to refresh master address end points.");
+ Set<ServerName> newMasters = new HashSet<>(getMasters().get());
Review comment:
Someone who has the HBase client embedded in their application might be
interested in observing this behavior, if they notice sudden spikes in RPC
traffic not directly correlated with their application's data path.
Or maybe it's enough to track these calls on the server-side. We'd see the
same spikes, though with less granularity.
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1,
conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
- int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+ rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id
yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the
authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
- Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+ // Generate the seed list of master stubs. Subsequent RPCs try to keep a
live list of masters
+ // by fetching the end points from this list.
+ populateMasterStubs(parseMasterAddrs(conf));
+ Runnable masterEndPointRefresher = () -> {
+ while (!Thread.interrupted()) {
+ try {
+ // Spurious wake ups are okay, worst case we make an extra RPC call
to refresh. We won't
+ // have duplicate refreshes because once the thread is past the
wait(), notify()s are
+ // ignored until the thread is back to the waiting state.
+ synchronized (refreshMasters) {
+ refreshMasters.wait(WAIT_TIME_OUT_MS);
+ }
+ LOG.debug("Attempting to refresh master address end points.");
+ Set<ServerName> newMasters = new HashSet<>(getMasters().get());
+ populateMasterStubs(newMasters);
+ LOG.debug("Finished refreshing master end points. {}", newMasters);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted during wait, aborting
refresh-masters-thread.", e);
Review comment:
Let me try again, and ask about the application lifecycle of this
thread. It looks like a thread we always want running, unless the Connection is
being closed. Now that there's a pool, I'm asking about the lifecycle of the
pool, when it's `shutdown` method is called. I'm also asking about the presence
of the thread -- is there a reason the thread could be interrupted (and thus
terminated) while the connection remains active? There's only one call to
submit, and there's nothing that checks for the presence of the thread. So can
a connection enter a state where it's still alive with no masterRefreshThread
running?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]