This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new 230bd2f HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a
regionserver to pick up all of the split work it is capable of
230bd2f is described below
commit 230bd2f968775dfbd553f02734c9b5ad219f8786
Author: Andrew Purtell <[email protected]>
AuthorDate: Fri Jul 12 12:26:00 2019 -0700
HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to
pick up all of the split work it is capable of
---
.../coordination/ZkSplitLogWorkerCoordination.java | 74 ++++++++++------------
.../hbase/regionserver/TestSplitLogWorker.java | 47 --------------
2 files changed, 34 insertions(+), 87 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 7e6708e..cf92ea8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -207,26 +207,26 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
* <p>
* @param path zk node for the task
*/
- private void grabTask(String path) {
+ private boolean grabTask(String path) {
Stat stat = new Stat();
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
workerInGrabTask = true;
if (Thread.interrupted()) {
- return;
+ return false;
}
}
try {
try {
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
- return;
+ return false;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
- return;
+ return false;
}
SplitLogTask slt;
try {
@@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
- return;
+ return false;
}
if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
- return;
+ return false;
}
currentVersion =
@@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
slt.getMode(), stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
- return;
+ return false;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
@@ -257,7 +257,7 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
- return;
+ return false;
}
LOG.info("worker " + server.getServerName() + " acquired task " + path);
@@ -274,6 +274,7 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
LOG.warn("Interrupted while yielding for other region servers", e);
Thread.currentThread().interrupt();
}
+ return true;
} finally {
synchronized (grabTaskLock) {
workerInGrabTask = false;
@@ -324,29 +325,8 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
server.getExecutorService().submit(hsh);
}
- /**
- * This function calculates how many splitters it could create based on
expected average tasks per
- * RS and the hard limit upper bound(maxConcurrentTasks) set by
configuration. <br>
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper
Bound)
- * @param numTasks current total number of available tasks
- */
- private int calculateAvailableSplitters(int numTasks) {
- // at lease one RS(itself) available
- int availableRSs = 1;
- try {
- List<String> regionServers =
- ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
- availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 :
regionServers.size());
- } catch (KeeperException e) {
- // do nothing
- LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
- }
-
- int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks %
availableRSs == 0) ? 0 : 1);
- expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
- // calculate how many more splitters we could spawn
- return Math.min(expectedTasksPerRS, maxConcurrentTasks)
- - this.tasksInProgress.get();
+ private boolean areSplittersAvailable() {
+ return maxConcurrentTasks - tasksInProgress.get() > 0;
}
/**
@@ -424,21 +404,35 @@ public class ZkSplitLogWorkerCoordination extends
ZooKeeperListener implements
}
}
int numTasks = paths.size();
+ boolean taskGrabbed = false;
for (int i = 0; i < numTasks; i++) {
- int idx = (i + offset) % paths.size();
- // don't call ZKSplitLog.getNodeName() because that will lead to
- // double encoding of the path name
- if (this.calculateAvailableSplitters(numTasks) > 0) {
- grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
- } else {
- LOG.debug("Current region server " + server.getServerName() + " has "
- + this.tasksInProgress.get() + " tasks in progress and can't
take more.");
- break;
+ while (!shouldStop) {
+ if (this.areSplittersAvailable()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Current region server " + server.getServerName()
+ + " is ready to take more tasks, will get task list and try
grab tasks again.");
+ }
+ int idx = (i + offset) % paths.size();
+ // don't call ZKSplitLog.getNodeName() because that will lead to
+ // double encoding of the path name
+ taskGrabbed |= grabTask(ZKUtil.joinZNode(watcher.splitLogZNode,
paths.get(idx)));
+ break;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Current region server " + server.getServerName() + "
has "
+ + this.tasksInProgress.get() + " tasks in progress and can't
take more.");
+ }
+ Thread.sleep(100);
+ }
}
if (shouldStop) {
return;
}
}
+ if (!taskGrabbed && !shouldStop) {
+ // do not grab any tasks, sleep a little bit to reduce zk request.
+ Thread.sleep(1000);
+ }
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq.get()) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 48be76b..e5f41ec 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -456,53 +456,6 @@ public class TestSplitLogWorker {
}
/**
- * The test checks SplitLogWorker should not spawn more splitters than
expected num of tasks per
- * RS
- * @throws Exception
- */
- @Test(timeout=60000)
- public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
- LOG.info("testAcquireMultiTasks");
- SplitLogCounters.resetCounters();
- final String TATAS = "tatas";
- final ServerName RS = ServerName.valueOf("rs,1,1");
- final ServerName RS2 = ServerName.valueOf("rs,1,2");
- final int maxTasks = 3;
- Configuration testConf =
HBaseConfiguration.create(TEST_UTIL.getConfiguration());
- testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
- RegionServerServices mockedRS = getRegionServer(RS);
-
- // create two RS nodes
- String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
- zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
- rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
- zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
-
- for (int i = 0; i < maxTasks; i++) {
- zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw,
TATAS + i),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
this.mode).toByteArray(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
-
- SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS,
neverEndingTask);
- slw.start();
- try {
- int acquiredTasks = 0;
- waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
- for (int i = 0; i < maxTasks; i++) {
- byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw,
TATAS + i));
- SplitLogTask slt = SplitLogTask.parseFrom(bytes);
- if (slt.isOwned(RS)) {
- acquiredTasks++;
- }
- }
- assertEquals(2, acquiredTasks);
- } finally {
- stopSplitLogWorker(slw);
- }
- }
-
- /**
* Create a mocked region server service instance
* @param server
* @return