Repository: hbase
Updated Branches:
  refs/heads/0.98 9e486bfd4 -> e5ecfb944


HBASE-12104 Some optimization and bugfix for HTableMultiplexer (Yi Deng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5ecfb94
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5ecfb94
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5ecfb94

Branch: refs/heads/0.98
Commit: e5ecfb944029c2d0f11a0aaf6b47a1bd4daf78cf
Parents: 9e486bf
Author: Ted Yu <te...@apache.org>
Authored: Mon Oct 6 22:34:16 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Mon Oct 6 22:34:16 2014 +0000

----------------------------------------------------------------------
 .../hadoop/hbase/client/HTableMultiplexer.java  | 396 ++++++++++---------
 1 file changed, 209 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5ecfb94/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 340e3a5..fe450a3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -28,23 +28,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncProcessCallback;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all 
the tables.
  * Each put will be sharded into different buffer queues based on its 
destination region server.
@@ -63,24 +68,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 @InterfaceStability.Evolving
 public class HTableMultiplexer {
   private static final Log LOG = 
LogFactory.getLog(HTableMultiplexer.class.getName());
-  private static int poolID = 0;
   
-  static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = 
"hbase.tablemultiplexer.flush.frequency.ms";
-
-  /** The map between each region server to its corresponding buffer queue */
-  private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> 
serverToBufferQueueMap =
-      new ConcurrentHashMap<HRegionLocation, LinkedBlockingQueue<PutStatus>>();
+  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
+      "hbase.tablemultiplexer.flush.period.ms";
+  public static final String TABLE_MULTIPLEXER_INIT_THREADS = 
"hbase.tablemultiplexer.init.threads";
+  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
+      "hbase.client.max.retries.in.queue";
 
   /** The map between each region server to its flush worker */
-  private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap 
=
-      new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
+  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
+      new ConcurrentHashMap<HRegionLocation, FlushWorker>();
 
   private final Configuration conf;
   private final HConnection conn;
   private final ExecutorService pool;
   private final int retryNum;
-  private int perRegionServerBufferQueueSize;
+  private final int perRegionServerBufferQueueSize;
   private final int maxKeyValueSize;
+  private final ScheduledExecutorService executor;
+  private final long flushPeriod;
   
   /**
    * @param conf The HBaseConfiguration
@@ -96,6 +102,11 @@ public class HTableMultiplexer {
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
+    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
+    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
+    this.executor =
+        Executors.newScheduledThreadPool(initThreads,
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
   }
 
   /**
@@ -106,7 +117,7 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer 
queue.
    * @throws IOException
    */
-  public boolean put(TableName tableName, final Put put) throws IOException {
+  public boolean put(TableName tableName, final Put put) {
     return put(tableName, put, this.retryNum);
   }
 
@@ -118,8 +129,7 @@ public class HTableMultiplexer {
    * @return the list of puts which could not be queued
    * @throws IOException
    */
-  public List<Put> put(TableName tableName, final List<Put> puts)
-      throws IOException {
+  public List<Put> put(TableName tableName, final List<Put> puts) {
     if (puts == null)
       return null;
     
@@ -140,23 +150,22 @@ public class HTableMultiplexer {
     return failedPuts;
   }
 
-  public List<Put> put(byte[] tableName, final List<Put> puts) throws 
IOException {
+  /**
+   * Deprecated. Use {@link #put(TableName, List) } instead.
+   */
+  @Deprecated
+  public List<Put> put(byte[] tableName, final List<Put> puts) {
     return put(TableName.valueOf(tableName), puts);
   }
-
-
+  
   /**
    * The put request will be buffered by its corresponding buffer queue. And 
the put request will be
    * retried before dropping the request.
    * Return false if the queue is already full.
-   * @param tableName
-   * @param put
-   * @param retry
    * @return true if the request can be accepted by its corresponding buffer 
queue.
    * @throws IOException
    */
-  public boolean put(final TableName tableName, final Put put, int retry)
-      throws IOException {
+  public boolean put(final TableName tableName, final Put put, int retry) {
     if (retry <= 0) {
       return false;
     }
@@ -166,25 +175,36 @@ public class HTableMultiplexer {
       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), 
false);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
-
         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
+
         // Generate a MultiPutStatus object and offer it into the queue
         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
         
         return queue.offer(s);
       }
-    } catch (Exception e) {
-      LOG.debug("Cannot process the put " + put + " because of " + e);
+    } catch (IOException e) {
+      LOG.debug("Cannot process the put " + put, e);
     }
     return false;
   }
 
-  public boolean put(final byte[] tableName, final Put put, int retry)
-      throws IOException {
+  /**
+   * Deprecated. Use {@link #put(TableName, Put) } instead.
+   */
+  @Deprecated
+  public boolean put(final byte[] tableName, final Put put, int retry) {
     return put(TableName.valueOf(tableName), put, retry);
   }
 
   /**
+   * Deprecated. Use {@link #put(TableName, Put)} instead.
+   */
+  @Deprecated
+  public boolean put(final byte[] tableName, Put put) {
+    return put(TableName.valueOf(tableName), put);
+  }
+  
+  /**
    * @return the current HTableMultiplexerStatus
    */
   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
@@ -192,30 +212,20 @@ public class HTableMultiplexer {
   }
 
   private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
-    LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
-    if (queue == null) {
-      synchronized (this.serverToBufferQueueMap) {
-        queue = serverToBufferQueueMap.get(addr);
-        if (queue == null) {
-          // Create a queue for the new region server
-          queue = new 
LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
-          serverToBufferQueueMap.put(addr, queue);
-
+    FlushWorker worker = serverToFlushWorkerMap.get(addr);
+    if (worker == null) {
+      synchronized (this.serverToFlushWorkerMap) {
+        worker = serverToFlushWorkerMap.get(addr);
+        if (worker == null) {
           // Create the flush worker
-          HTableFlushWorker worker =
-              new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
+          worker = new FlushWorker(conf, this.conn, addr, this, 
perRegionServerBufferQueueSize,
+                  pool, executor);
           this.serverToFlushWorkerMap.put(addr, worker);
-
-          // Launch a daemon thread to flush the puts
-          // from the queue to its corresponding region server.
-          String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + 
(poolID++);
-          Thread t = new Thread(worker, name);
-          t.setDaemon(true);
-          t.start();
+          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, 
TimeUnit.MILLISECONDS);
         }
       }
     }
-    return queue;
+    return worker.getQueue();
   }
 
   /**
@@ -223,7 +233,7 @@ public class HTableMultiplexer {
    * report the number of buffered requests and the number of the failed 
(dropped) requests
    * in total or on per region server basis.
    */
-  static class HTableMultiplexerStatus {
+  public static class HTableMultiplexerStatus {
     private long totalFailedPutCounter;
     private long totalBufferedPutCounter;
     private long maxLatency;
@@ -234,7 +244,7 @@ public class HTableMultiplexer {
     private Map<String, Long> serverToMaxLatencyMap;
 
     public HTableMultiplexerStatus(
-        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
       this.totalBufferedPutCounter = 0;
       this.totalFailedPutCounter = 0;
       this.maxLatency = 0;
@@ -247,17 +257,17 @@ public class HTableMultiplexer {
     }
 
     private void initialize(
-        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
       if (serverToFlushWorkerMap == null) {
         return;
       }
 
       long averageCalcSum = 0;
       int averageCalcCount = 0;
-      for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : 
serverToFlushWorkerMap
+      for (Map.Entry<HRegionLocation, FlushWorker> entry : 
serverToFlushWorkerMap
           .entrySet()) {
         HRegionLocation addr = entry.getKey();
-        HTableFlushWorker worker = entry.getValue();
+        FlushWorker worker = entry.getValue();
 
         long bufferedCounter = worker.getTotalBufferedCount();
         long failedCounter = worker.getTotalFailedCount();
@@ -325,25 +335,15 @@ public class HTableMultiplexer {
   }
   
   private static class PutStatus {
-    private final HRegionInfo regionInfo;
-    private final Put put;
-    private final int retryCount;
-    public PutStatus(final HRegionInfo regionInfo, final Put put,
-        final int retryCount) {
+    public final HRegionInfo regionInfo;
+    public final Put put;
+    public final int retryCount;
+
+    public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
       this.regionInfo = regionInfo;
       this.put = put;
       this.retryCount = retryCount;
     }
-
-    public HRegionInfo getRegionInfo() {
-      return regionInfo;
-    }
-    public Put getPut() {
-      return put;
-    }
-    public int getRetryCount() {
-      return retryCount;
-    }
   }
 
   /**
@@ -386,28 +386,41 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class HTableFlushWorker implements Runnable, 
AsyncProcessCallback<Object> {
+  private static class FlushWorker implements Runnable, 
AsyncProcessCallback<Object> {
     private final HRegionLocation addr;
-    private final Configuration conf;
     private final LinkedBlockingQueue<PutStatus> queue;
-    private final HTableMultiplexer htableMultiplexer;
+    private final HTableMultiplexer multiplexer;
     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
-    private final AtomicInteger currentProcessingPutCount = new 
AtomicInteger(0);
+    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
     private final AtomicAverageCounter averageLatency = new 
AtomicAverageCounter();
     private final AtomicLong maxLatency = new AtomicLong(0);
+
     private final AsyncProcess<Object> ap;
     private final List<Object> results = new ArrayList<Object>();
+    private final List<PutStatus> processingList = new ArrayList<PutStatus>();
+    private final ScheduledExecutorService executor;
+    private final int maxRetryInQueue;
+    private final AtomicInteger retryInQueue = new AtomicInteger(0);
+    private final int rpcTimeOutMs;
     
-    public HTableFlushWorker(Configuration conf, HConnection conn, 
HRegionLocation addr,
-        HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> 
queue, ExecutorService pool) {
+    public FlushWorker(Configuration conf, HConnection conn, HRegionLocation 
addr,
+        HTableMultiplexer multiplexer, int perRegionServerBufferQueueSize,
+        ExecutorService pool, ScheduledExecutorService executor) {
       this.addr = addr;
-      this.conf = conf;
-      this.htableMultiplexer = htableMultiplexer;
-      this.queue = queue;
+      this.multiplexer = multiplexer;
+      this.queue = new 
LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
       RpcRetryingCallerFactory rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(conf);
       RpcControllerFactory rpcControllerFactory = 
RpcControllerFactory.instantiate(conf);
       this.ap = new AsyncProcess<Object>(conn, null, pool, this, conf, 
rpcCallerFactory,
               rpcControllerFactory);
+      this.executor = executor;
+      this.maxRetryInQueue = 
conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+      this.rpcTimeOutMs =
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    }
+
+    protected LinkedBlockingQueue<PutStatus> getQueue() {
+      return this.queue;
     }
 
     public long getTotalFailedCount() {
@@ -415,7 +428,7 @@ public class HTableMultiplexer {
     }
 
     public long getTotalBufferedCount() {
-      return queue.size() + currentProcessingPutCount.get();
+      return queue.size() + currentProcessingCount.get();
     }
 
     public AtomicAverageCounter getAverageLatencyCounter() {
@@ -426,141 +439,150 @@ public class HTableMultiplexer {
       return this.maxLatency.getAndSet(0);
     }
 
-    private boolean resubmitFailedPut(PutStatus failedPutStatus,
-        HRegionLocation oldLoc) throws IOException {
-      Put failedPut = failedPutStatus.getPut();
-      // The currentPut is failed. So get the table name for the currentPut.
-      TableName tableName = failedPutStatus.getRegionInfo().getTable();
+    private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) 
throws IOException {
       // Decrease the retry count
-      int retryCount = failedPutStatus.getRetryCount() - 1;
+      final int retryCount = ps.retryCount - 1;
       
       if (retryCount <= 0) {
         // Update the failed counter and no retry any more.
         return false;
-      } else {
-        // Retry one more time
-        return this.htableMultiplexer.put(tableName, failedPut, retryCount);
       }
+
+      int cnt = retryInQueue.incrementAndGet();
+      if (cnt > maxRetryInQueue) {
+        // Too many Puts in queue for resubmit, give up this
+        retryInQueue.decrementAndGet();
+        return false;
+      }
+
+      final Put failedPut = ps.put;
+      // The currentPut is failed. So get the table name for the currentPut.
+      final TableName tableName = ps.regionInfo.getTable();
+
+      // Wait at least RPC timeout time
+      long delayMs = rpcTimeOutMs;
+      delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
+              multiplexer.retryNum - retryCount)));
+
+      executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          boolean succ = false;
+          try {
+            succ = FlushWorker.this.multiplexer.put(tableName, failedPut, 
retryCount);
+          } finally {
+            FlushWorker.this.retryInQueue.decrementAndGet();
+            if (!succ) {
+              FlushWorker.this.totalFailedPutCount.incrementAndGet();
+            }
+          }
+        }
+      }, delayMs, TimeUnit.MILLISECONDS);
+      return true;
     }
 
     @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings
-        (value = "REC_CATCH_EXCEPTION", justification = "na")
     public void run() {
-      List<PutStatus> processingList = new ArrayList<PutStatus>();
-      /** 
-       * The frequency in milliseconds for the current thread to process the 
corresponding  
-       * buffer queue.  
-       **/
-      long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
-      
-      // initial delay
-      try {
-        Thread.sleep(frequency);
-      } catch (InterruptedException e) {
-      } // Ignore
-
-      long start, elapsed;
       int failedCount = 0;
-      while (true) {
-        try {
-          start = elapsed = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+        long start = EnvironmentEdgeManager.currentTimeMillis();
 
-          // Clear the processingList, putToStatusMap and failedCount
-          processingList.clear();
-          failedCount = 0;
+        // Clear the processingList, putToStatusMap and failedCount
+        processingList.clear();
+        failedCount = 0;
+        
+        // drain all the queued puts into the tmp list
+        queue.drainTo(processingList);
+        currentProcessingCount.set(processingList.size());
+
+        if (processingList.size() > 0) {
+          this.results.clear();
+          List<Action<Row>> retainedActions = new 
ArrayList<Action<Row>>(processingList.size());
+          MultiAction<Row> actions = new MultiAction<Row>();
+          for (int i = 0; i < processingList.size(); i++) {
+            PutStatus putStatus = processingList.get(i);
+            Action<Row> action = new Action<Row>(putStatus.put, i);
+            actions.add(putStatus.regionInfo.getRegionName(), action);
+            retainedActions.add(action);
+            this.results.add(null);
+          }
           
-          // drain all the queued puts into the tmp list
-          queue.drainTo(processingList);
-          currentProcessingPutCount.set(processingList.size());
-
-          if (processingList.size() > 0) {
-            this.results.clear();
-            List<Action<Row>> retainedActions = new 
ArrayList<Action<Row>>(processingList.size());
-            MultiAction<Row> actions = new MultiAction<Row>();
-            for (int i = 0; i < processingList.size(); i++) {
-              PutStatus putStatus = processingList.get(i);
-              Action<Row> action = new Action<Row>(putStatus.getPut(), i);
-              actions.add(putStatus.getRegionInfo().getRegionName(), action);
-              retainedActions.add(action);
-              this.results.add(null);
+          // Process this multi-put request
+          List<PutStatus> failed = null;
+          Map<HRegionLocation, MultiAction<Row>> actionsByServer =
+              Collections.singletonMap(addr, actions);
+          try {
+            HConnectionManager.ServerErrorTracker errorsByServer =
+                new HConnectionManager.ServerErrorTracker(1, 10);
+            ap.sendMultiAction(retainedActions, actionsByServer, 10, 
errorsByServer);
+            ap.waitUntilDone();
+
+            if (ap.hasError()) {
+              // We just log and ignore the exception here since failed Puts 
will be resubmit again.
+              LOG.debug("Caught some exceptions when flushing puts to region 
server "
+                  + addr.getHostnamePort(), ap.getErrors());
             }
-            
-            // Process this multi-put request
-            List<PutStatus> failed = null;
-            Map<HRegionLocation, MultiAction<Row>> actionsByServer =
-                Collections.singletonMap(addr, actions);
-            try {
-              HConnectionManager.ServerErrorTracker errorsByServer =
-                  new HConnectionManager.ServerErrorTracker(1, 10);
-              ap.sendMultiAction(retainedActions, actionsByServer, 10, 
errorsByServer);
-              ap.waitUntilDone();
-
-              if (ap.hasError()) {
-                throw ap.getErrors();
-              }
-            } catch (IOException e) {
-              LOG.debug("Caught some exceptions " + e
-                  + " when flushing puts to region server " + 
addr.getHostnamePort());
-            } finally {
-              // mutate list so that it is empty for complete success, or
-              // contains only failed records
-              // results are returned in the same order as the requests in list
-              // walk the list backwards, so we can remove from list without
-              // impacting the indexes of earlier members
-              for (int i = 0; i < results.size(); i++) {
-                if (results.get(i) == null) {
-                  if (failed == null) {
-                    failed = new ArrayList<PutStatus>();
-                  }
-                  failed.add(processingList.get(i));
+          } finally {
+            // mutate list so that it is empty for complete success, or
+            // contains only failed records
+            // results are returned in the same order as the requests in list
+            // walk the list backwards, so we can remove from list without
+            // impacting the indexes of earlier members
+            for (int i = 0; i < results.size(); i++) {
+              if (results.get(i) == null) {
+                if (failed == null) {
+                  failed = new ArrayList<PutStatus>();
                 }
+                failed.add(processingList.get(i));
+              } else {
+                failedCount--;
               }
             }
+          }
 
-            if (failed != null) {
-              // Resubmit failed puts
-              for (PutStatus putStatus : processingList) {
-                if (!resubmitFailedPut(putStatus, this.addr)) {
-                  failedCount++;
-                }
+          if (failed != null) {
+            // Resubmit failed puts
+            for (PutStatus putStatus : processingList) {
+              if (resubmitFailedPut(putStatus, this.addr)) {
+                failedCount--;
               }
-              // Update the totalFailedCount
-              this.totalFailedPutCount.addAndGet(failedCount);
-            }
-            
-            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
-            // Update latency counters
-            averageLatency.add(elapsed);
-            if (elapsed > maxLatency.get()) {
-              maxLatency.set(elapsed);
             }
-            
-            // Log some basic info
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Processed " + currentProcessingPutCount
-                  + " put requests for " + addr.getHostnamePort() + " and "
-                  + failedCount + " failed" + ", latency for this send: "
-                  + elapsed);
-            }
-
-            // Reset the current processing put count
-            currentProcessingPutCount.set(0);
+            // Update the totalFailedCount
+            this.totalFailedPutCount.addAndGet(failedCount);
           }
-
-          // Sleep for a while
-          if (elapsed == start) {
-            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
+          
+          long elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
+          // Update latency counters
+          averageLatency.add(elapsed);
+          if (elapsed > maxLatency.get()) {
+            maxLatency.set(elapsed);
           }
-          if (elapsed < frequency) {
-            Thread.sleep(frequency - elapsed);
+
+          // Log some basic info
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Processed " + currentProcessingCount + " put requests 
for "
+                + addr.getHostnamePort() + " and " + failedCount + " failed"
+                + ", latency for this send: " + elapsed);
           }
-        } catch (Exception e) {
-          // Log all the exceptions and move on
-          LOG.debug("Caught some exceptions " + e
-              + " when flushing puts to region server "
-                + addr.getHostnamePort(), e);
+  
+          // Reset the current processing put count
+          currentProcessingCount.set(0);
+        }
+      } catch (RuntimeException e) {
+        // To make findbugs happy
+        // Log all the exceptions and move on
+        LOG.debug("Caught some exceptions " + e + " when flushing puts to 
region server "
+              + addr.getHostnamePort(), e);
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
         }
+        // Log all the exceptions and move on
+        LOG.debug("Caught some exceptions " + e + " when flushing puts to 
region server "
+              + addr.getHostnamePort(), e);
+      } finally {
+        // Update the totalFailedCount
+        this.totalFailedPutCount.addAndGet(failedCount);
       }
     }
 

Reply via email to