This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new 5fd4f37  HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a 
regionserver to pick up all of the split work it is capable of
5fd4f37 is described below

commit 5fd4f37675d45e8b509684aea9fe2d6410a86e7b
Author: Andrew Purtell <[email protected]>
AuthorDate: Fri Jul 12 12:26:00 2019 -0700

    HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to 
pick up all of the split work it is capable of
---
 .../coordination/ZkSplitLogWorkerCoordination.java | 74 ++++++++++------------
 .../hbase/regionserver/TestSplitLogWorker.java     | 47 --------------
 2 files changed, 34 insertions(+), 87 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 7e6708e..cf92ea8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -207,26 +207,26 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
    * <p>
    * @param path zk node for the task
    */
-  private void grabTask(String path) {
+  private boolean grabTask(String path) {
     Stat stat = new Stat();
     byte[] data;
     synchronized (grabTaskLock) {
       currentTask = path;
       workerInGrabTask = true;
       if (Thread.interrupted()) {
-        return;
+        return false;
       }
     }
     try {
       try {
         if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
           
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
-          return;
+          return false;
         }
       } catch (KeeperException e) {
         LOG.warn("Failed to get data for znode " + path, e);
         
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
-        return;
+        return false;
       }
       SplitLogTask slt;
       try {
@@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
       } catch (DeserializationException e) {
         LOG.warn("Failed parse data for znode " + path, e);
         
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
-        return;
+        return false;
       }
       if (!slt.isUnassigned()) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
-        return;
+        return false;
       }
 
       currentVersion =
@@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
             slt.getMode(), stat.getVersion());
       if (currentVersion < 0) {
         
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
-        return;
+        return false;
       }
 
       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
@@ -257,7 +257,7 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
 
         endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
           SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
-        return;
+        return false;
       }
 
       LOG.info("worker " + server.getServerName() + " acquired task " + path);
@@ -274,6 +274,7 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
         LOG.warn("Interrupted while yielding for other region servers", e);
         Thread.currentThread().interrupt();
       }
+      return true;
     } finally {
       synchronized (grabTaskLock) {
         workerInGrabTask = false;
@@ -324,29 +325,8 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
     server.getExecutorService().submit(hsh);
   }
 
-  /**
-   * This function calculates how many splitters it could create based on 
expected average tasks per
-   * RS and the hard limit upper bound(maxConcurrentTasks) set by 
configuration. <br>
-   * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper 
Bound)
-   * @param numTasks current total number of available tasks
-   */
-  private int calculateAvailableSplitters(int numTasks) {
-    // at lease one RS(itself) available
-    int availableRSs = 1;
-    try {
-      List<String> regionServers =
-          ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
-      availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : 
regionServers.size());
-    } catch (KeeperException e) {
-      // do nothing
-      LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
-    }
-
-    int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % 
availableRSs == 0) ? 0 : 1);
-    expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
-    // calculate how many more splitters we could spawn
-    return Math.min(expectedTasksPerRS, maxConcurrentTasks)
-        - this.tasksInProgress.get();
+  private boolean areSplittersAvailable() {
+    return maxConcurrentTasks - tasksInProgress.get() > 0;
   }
 
   /**
@@ -424,21 +404,35 @@ public class ZkSplitLogWorkerCoordination extends 
ZooKeeperListener implements
         }
       }
       int numTasks = paths.size();
+      boolean taskGrabbed = false;
       for (int i = 0; i < numTasks; i++) {
-        int idx = (i + offset) % paths.size();
-        // don't call ZKSplitLog.getNodeName() because that will lead to
-        // double encoding of the path name
-        if (this.calculateAvailableSplitters(numTasks) > 0) {
-          grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
-        } else {
-          LOG.debug("Current region server " + server.getServerName() + " has "
-              + this.tasksInProgress.get() + " tasks in progress and can't 
take more.");
-          break;
+        while (!shouldStop) {
+          if (this.areSplittersAvailable()) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Current region server " + server.getServerName()
+                + " is ready to take more tasks, will get task list and try 
grab tasks again.");
+            }
+            int idx = (i + offset) % paths.size();
+            // don't call ZKSplitLog.getNodeName() because that will lead to
+            // double encoding of the path name
+            taskGrabbed |= grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, 
paths.get(idx)));
+            break;
+          } else {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Current region server " + server.getServerName() + " 
has "
+                + this.tasksInProgress.get() + " tasks in progress and can't 
take more.");
+            }
+            Thread.sleep(100);
+          }
         }
         if (shouldStop) {
           return;
         }
       }
+      if (!taskGrabbed && !shouldStop) {
+        // do not grab any tasks, sleep a little bit to reduce zk request.
+        Thread.sleep(1000);
+      }
       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
       synchronized (taskReadyLock) {
         while (seq_start == taskReadySeq.get()) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 48be76b..e5f41ec 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -456,53 +456,6 @@ public class TestSplitLogWorker {
   }
 
   /**
-   * The test checks SplitLogWorker should not spawn more splitters than 
expected num of tasks per
-   * RS
-   * @throws Exception
-   */
-  @Test(timeout=60000)
-  public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
-    LOG.info("testAcquireMultiTasks");
-    SplitLogCounters.resetCounters();
-    final String TATAS = "tatas";
-    final ServerName RS = ServerName.valueOf("rs,1,1");
-    final ServerName RS2 = ServerName.valueOf("rs,1,2");
-    final int maxTasks = 3;
-    Configuration testConf = 
HBaseConfiguration.create(TEST_UTIL.getConfiguration());
-    testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
-    RegionServerServices mockedRS = getRegionServer(RS);
-
-    // create two RS nodes
-    String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
-    zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
-    rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
-    zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
-
-    for (int i = 0; i < maxTasks; i++) {
-      zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
TATAS + i),
-        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
this.mode).toByteArray(),
-          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    }
-
-    SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, 
neverEndingTask);
-    slw.start();
-    try {
-      int acquiredTasks = 0;
-      waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
-      for (int i = 0; i < maxTasks; i++) {
-        byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, 
TATAS + i));
-        SplitLogTask slt = SplitLogTask.parseFrom(bytes);
-        if (slt.isOwned(RS)) {
-          acquiredTasks++;
-        }
-      }
-      assertEquals(2, acquiredTasks);
-    } finally {
-      stopSplitLogWorker(slw);
-    }
-  }
-
-  /**
    * Create a mocked region server service instance
    * @param server
    * @return

Reply via email to