Author: liyin
Date: Thu Apr 17 00:48:58 2014
New Revision: 1588112

URL: http://svn.apache.org/r1588112
Log:
[HBASE-10975] HtableMultiplexer to use executor

Author: adela

Summary:
Adding ScheduledExecutor to deal with thread creation and execution instead of 
doing that via creating new Thread.
Also making the code a little bit smaller with that :)

Test Plan: created a unit test which spawns 10 client threads doing 10k puts 
each

Reviewers: liyintang, daviddeng

Reviewed By: daviddeng

CC: hbase-eng@, daviddeng

Differential Revision: https://phabricator.fb.com/D1273528

Modified:
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1588112&r1=1588111&r2=1588112&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 Thu Apr 17 00:48:58 2014
@@ -31,7 +31,10 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -43,6 +46,9 @@ import org.apache.hadoop.hbase.HRegionLo
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all 
the tables.
@@ -60,8 +66,6 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class HTableMultiplexer {
   private static final Log LOG = 
LogFactory.getLog(HTableMultiplexer.class.getName());
-  private static int poolID = 0;
-
   private Map<byte[], HTable> tableNameToHTableMap;
 
   /** The map between each region server to its corresponding buffer queue */
@@ -75,6 +79,10 @@ public class HTableMultiplexer {
   private HConnection connection;
   private int retryNum;
   private int perRegionServerBufferQueueSize;
+  private ScheduledExecutorService executor;
+  private long frequency = 100;
+  //initial number of threads in the pool
+  public static final int INITIAL_NUM_THREADS = 10;
   
   /**
    * 
@@ -92,6 +100,12 @@ public class HTableMultiplexer {
             Bytes.BYTES_COMPARATOR);
     this.retryNum = conf.getInt("hbase.client.retries.number", 10);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
+    this.frequency = conf.getLong("hbase.htablemultiplexer.flush.frequency.ms",
+        100);
+    this.executor = Executors.newScheduledThreadPool(
+        INITIAL_NUM_THREADS,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("HTableFlushWorker-%d").build());
   }
 
   /**
@@ -219,14 +233,7 @@ public class HTableMultiplexer {
       HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
           this.connection, this, queue);
       this.serverToFlushWorkerMap.put(addr, worker);
-
-      // Launch a daemon thread to flush the puts
-      // from the queue to its corresponding region server.
-      String name = "HTableFlushWorker-" + addr.getHostNameWithPort() + "-"
-          + (poolID++);
-      Thread t = new Thread(worker, name);
-      t.setDaemon(true);
-      t.start();
+      executor.scheduleAtFixedRate(worker, frequency, frequency, 
TimeUnit.MICROSECONDS);
     }
     return queue;
   }
@@ -652,129 +659,101 @@ public class HTableMultiplexer {
 
     @Override
     public void run() {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      long elapsed = 0;
       List<PutStatus> processingList = new ArrayList<PutStatus>();
-      /** 
-       * The frequency in milliseconds for the current thread to process the 
corresponding  
-       * buffer queue.  
-       **/
-      long frequency = 
conf.getLong("hbase.htablemultiplexer.flush.frequency.ms", 100);
-      
-      // initial delay
+      int completelyFailed = 0;
       try {
-        Thread.sleep(frequency);
-      } catch (InterruptedException e) {
-      } // Ignore
-
-      long start, elapsed;
-      int failedCount = 0;
-      while (true) {
-        try {
-          start = elapsed = System.currentTimeMillis();
-
-          // Clear the processingList, putToStatusMap and failedCount
-          processingList.clear();
-          failedCount = 0;
-          
-          // drain all the queued puts into the tmp list
-          queue.drainTo(processingList);
-          currentProcessingPutCount.set(processingList.size());
-          if (minProcessingPutCount.get() > currentProcessingPutCount.get()) {
-            minProcessingPutCount.set(currentProcessingPutCount.get());
-          } else if (maxProcessingPutCount.get() < 
currentProcessingPutCount.get()) {
-            maxProcessingPutCount.set(currentProcessingPutCount.get());
+        // drain all the queued puts into the tmp list
+        queue.drainTo(processingList);
+        currentProcessingPutCount.set(processingList.size());
+        if (minProcessingPutCount.get() > currentProcessingPutCount.get()) {
+          minProcessingPutCount.set(currentProcessingPutCount.get());
+        } else if (maxProcessingPutCount.get() < currentProcessingPutCount
+            .get()) {
+          maxProcessingPutCount.set(currentProcessingPutCount.get());
+        }
+        avgProcessingPutCount.add(currentProcessingPutCount.get());
+        if (processingList.size() > 0) {
+          MultiPut mput = new MultiPut(this.addr);
+          HBaseRPCOptions options = null;
+          for (PutStatus putStatus : processingList) {
+            // Update the MultiPut
+            mput.add(putStatus.getRegionInfo().getRegionName(),
+                putStatus.getPut());
+            if (putStatus.getOptions() != null) {
+              options = putStatus.getOptions();
+            }
           }
-          avgProcessingPutCount.add(currentProcessingPutCount.get());
-          if (processingList.size() > 0) {
-            // Create the MultiPut object
-            // Amit: Need to change this to use multi, at some point in future.
-            MultiPut mput = new MultiPut(this.addr);
-            HBaseRPCOptions options = null;
-            for (PutStatus putStatus: processingList) {
-              // Update the MultiPut
-              mput.add(putStatus.getRegionInfo().getRegionName(), 
-                  putStatus.getPut());
-              if (putStatus.getOptions () != null) {
-                options = putStatus.getOptions ();
+
+          // Process this multiput request
+          List<Put> failed = null;
+          Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, 
HRegionFailureInfo>();
+          try {
+            failed = connection.processListOfMultiPut(Arrays.asList(mput),
+                null, options, failureInfo);
+          } catch (PreemptiveFastFailException e) {
+            // Client is not blocking on us. So, let us treat this
+            // as a normal failure, and retry.
+            for (PutStatus putStatus : processingList) {
+              if (!resubmitFailedPut(putStatus, this.addr)) {
+                completelyFailed++;
               }
             }
-            
-            // Process this multiput request
-            List<Put> failed = null;
-            Map<String, HRegionFailureInfo> failureInfo =
-                new HashMap<String, HRegionFailureInfo>();
-            try {
-              failed = connection.processListOfMultiPut(Arrays.asList(mput), 
null, options,
-                  failureInfo);
-            } catch(PreemptiveFastFailException e) {
-              // Client is not blocking on us. So, let us treat this
-              // as a normal failure, and retry.
-              for (PutStatus putStatus: processingList) {
+          }
+
+          long putsToRetry = 0;
+          if (failed != null) {
+            if (failed.size() == processingList.size()) {
+              // All the puts for this region server are failed. Going to retry
+              // it later
+              for (PutStatus putStatus : processingList) {
                 if (!resubmitFailedPut(putStatus, this.addr)) {
-                  failedCount++;
+                  completelyFailed++;
                 }
               }
-            }
-
-            long putsToRetry = 0;
-            if (failed != null) {
-              if (failed.size() == processingList.size()) {
-                // All the puts for this region server are failed. Going to 
retry it later
-                for (PutStatus putStatus: processingList) {
-                  if (!resubmitFailedPut(putStatus, this.addr)) {
-                    failedCount++;
-                  }
-                }
-              } else {
-                Set<Put> failedPutSet = new HashSet<Put>(failed);
-                for (PutStatus putStatus: processingList) {
-                  if (failedPutSet.contains(putStatus.getPut())
-                      && !resubmitFailedPut(putStatus, this.addr)) {
-                    failedCount++;
-                  }
+            } else {
+              Set<Put> failedPutSet = new HashSet<Put>(failed);
+              for (PutStatus putStatus : processingList) {
+                if (failedPutSet.contains(putStatus.getPut())
+                    && !resubmitFailedPut(putStatus, this.addr)) {
+                  completelyFailed++;
                 }
               }
-              putsToRetry = failed.size() - failedCount;
-            }
-            // Update the totalFailedCount
-            this.totalFailedPutCount.addAndGet(failedCount);
-            // Update the totalSucceededPutCount
-            this.totalSucceededPutCount.addAndGet(processingList.size() -
-                                                  failedCount - putsToRetry);
-            // Updated the total retried put counts.
-            this.totalRetriedPutCount.addAndGet(putsToRetry);
-            
-            elapsed = System.currentTimeMillis() - start;
-            // Update latency counters
-            averageLatency.add(elapsed);
-            if (elapsed > maxLatency.get()) {
-              maxLatency.set(elapsed);
-            }
-
-            // Log some basic info
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Processed " + currentProcessingPutCount
-                  + " put requests for " + addr.getHostNameWithPort()
-                  + " and " + failedCount + " failed"
-                  + ", latency for this send: " + elapsed);
             }
-            
-            // Reset the current processing put count
-            currentProcessingPutCount.set(0);
+            putsToRetry = failed.size() - completelyFailed;
           }
-
-          // Sleep for a while
-          if (elapsed == start) {
-            elapsed = System.currentTimeMillis() - start;
+          // Update the totalFailedCount
+          this.totalFailedPutCount.addAndGet(completelyFailed);
+          // Update the totalSucceededPutCount
+          this.totalSucceededPutCount.addAndGet(processingList.size()
+              - completelyFailed - putsToRetry);
+          // Updated the total retried put counts.
+          this.totalRetriedPutCount.addAndGet(putsToRetry);
+
+          elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
+          // Update latency counters
+          averageLatency.add(elapsed);
+          if (elapsed > maxLatency.get()) {
+            maxLatency.set(elapsed);
           }
-          if (elapsed < frequency) {
-            Thread.sleep(frequency - elapsed);
+
+          // Log some basic info
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Processed " + currentProcessingPutCount
+                + " put requests for " + addr.getHostNameWithPort() + " and "
+                + completelyFailed + " failed" + ", latency for this send: "
+                + elapsed);
           }
-        } catch (Exception e) {
-          // Log all the exceptions and move on
-          LOG.debug("Caught some exceptions " + e
-              + " when flushing puts to region server "
-              + addr.getHostNameWithPort());
+
+          // Reset the current processing put count
+          currentProcessingPutCount.set(0);
         }
+      } catch (Exception e) {
+        // Log all the exceptions and move on
+        LOG.debug("Caught some exceptions " + e
+            + " when flushing puts to region server "
+            + addr.getHostNameWithPort());
       }
     }
   }

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java?rev=1588112&r1=1588111&r2=1588112&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
 Thu Apr 17 00:48:58 2014
@@ -19,6 +19,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
@@ -32,9 +40,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 public class TestHTableMultiplexer {
   final Log LOG = LogFactory.getLog(getClass());
@@ -233,5 +238,116 @@ public class TestHTableMultiplexer {
     multiplexer.put(TABLE1, put, HBaseRPCOptions.DEFAULT);
     Assert.assertEquals("storedHTableCount", 1, status.getStoredHTableCount());
   }
+
+  /**
+   * Test when multiple client threads are using HTableMultiplexer. Spawn 10
+   * threads that do 10k multiputs each, and check in the end that we got
+   * expected number of results back when we do Gets.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleThreads() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong("hbase.htablemultiplexer.flush.frequency.ms", 10);
+    byte[] TABLE = Bytes.toBytes("testMultipleThreads");
+    HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY });
+    HTableMultiplexer multiplexer = new HTableMultiplexer(
+        TEST_UTIL.getConfiguration(), 1000);
+    ExecutorService executor = Executors.newFixedThreadPool(10);
+    List<Future<?>> futures = new ArrayList<>(10);
+    byte[] rowPrefix = Bytes.toBytes("row");
+    for (int i = 0; i < 10; i++) {
+      byte[] suffix = Bytes.toBytes(i);
+      byte[] row = Bytes.add(rowPrefix, suffix);
+      Runnable runnable = new Client(multiplexer, TABLE, row);
+      Future<?> future = executor.submit(runnable);
+      futures.add(future);
+    }
+    for (Future<?> f : futures) {
+      f.get();
+    }
+    // Wait for multiplexer flush
+    Thread.sleep(2000);
+    for (int i = 0; i < 10; i++) {
+      byte[] suffix = Bytes.toBytes(i);
+      byte[] row = Bytes.add(rowPrefix, suffix);
+      checkForGets(ht, row);
+    }
+    // check the latencies
+    HTableMultiplexerStatus status = multiplexer.getHTableMultiplexerStatus();
+    System.out.println("max latency: " + status.getMaxLatency());
+  }
+
+  /**
+   * Utility method to check if we got all the data back after putting with
+   * multiplexer
+   */
+  public void checkForGets(HTable ht, byte[] row) throws IOException {
+    for (int i = 0; i < 10000; i++) {
+
+      byte[] suffix = Bytes.toBytes(i);
+      byte[] exactRow = Bytes.add(row, suffix);
+
+      Get get = new Get(exactRow);
+      Result r = ht.get(get);
+      Assert.assertEquals(1, r.getKvs().size());
+      Assert.assertEquals(Bytes.toString(exactRow),
+          Bytes.toString(r.getKvs().get(0).getValue()));
+    }
+  }
+
+  /**
+   * A client which is doing 10k puts via multiplexer
+   *
+   */
+  public static class Client implements Runnable {
+    private HTableMultiplexer multiPlex;
+    private byte[] ht;
+    private byte[] row;
+    private byte[] dummy = Bytes.toBytes("dummy");
+
+    public Client(HTableMultiplexer multiPlex, byte[] ht, byte[] row) {
+      this.multiPlex = multiPlex;
+      this.ht = ht;
+      this.row = row;
+    }
+
+    @Override
+    public void run() {
+      int maxTry = 0;
+      for (int i = 0; i < 10000; i++) {
+        try {
+          // sleeping so that we don't put a whole bunch of data at once
+          Thread.sleep(1);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+        }
+        byte[] suffix = Bytes.toBytes(i);
+        byte[] exactRow = Bytes.add(row, suffix);
+        Put put = new Put(exactRow);
+        put.add(FAMILY, dummy, exactRow);
+        try {
+          boolean success = true;
+          int numTry = 0;
+          while (true) {
+            success = multiPlex.put(ht, put, HBaseRPCOptions.DEFAULT);
+            numTry++;
+            if (success)
+              break;
+            else
+              Thread.sleep(1000);
+          }
+          if (numTry > maxTry)
+            maxTry = numTry;
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      System.out.println("Max number of times this thread retried: " + maxTry);
+    }
+  }
 }
 


Reply via email to