Author: liyin
Date: Fri Mar 14 18:09:43 2014
New Revision: 1577638

URL: http://svn.apache.org/r1577638
Log:
[master] Stabliize async client with trunk

Author: fan

Summary:
Original diff: https://phabricator.fb.com/D1192422

1. Make async APIs work with header protocol feature.
2. Remove switch of connection pooling because we probably don't need to 
disable it in the future.
3. Tweaks to avoid creating unnecessary connections. Skipping connection 
refreshment is only applied to async part. Will carefully review and fix bad 
logic in sync part in the future.
4. Fix thread leaking problem in HTableAsync.

Other things to mention:
Async client in HTable is disabled by default in this diff for perf testing of 
sync client.
Pool exhausted exception is also eliminated on osh7.

Test Plan:
TestSimpleOperations, TestHeaderSendReceive
Async client runs stably on osh7 for more than 3 days

Reviewers: adela, daviddeng, gauravm, manukranthk

Reviewed By: manukranthk

CC: hbase-dev@

Differential Revision: https://phabricator.fb.com/D1210349

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHeaderSendReceive.java

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java 
(original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java 
Fri Mar 14 18:09:43 2014
@@ -943,6 +943,8 @@ public final class HConstants {
   public static final boolean HTABLE_ASYNC_CALLS_DEFAULT = false;
 
   public static final int DEFAULT_HTABLE_ASYNC_CORE_THREADS = 100;
+  public static final int DEFAULT_HTABLE_ASYNC_MAX_THREADS = 1000;
+  public static final int DEFAULT_HTABLE_ASYNC_KEEPALIVE_SECONDS = 60;
 
   // These are the IO priority values for various regionserver operations. Note
   // that these are priorities relative to each other. See the man page for

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
(original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
Fri Mar 14 18:09:43 2014
@@ -102,7 +102,7 @@ public class HTable implements HTableInt
 
   public void initHTableAsync() throws IOException {
     if (doAsync && hta == null) {
-      hta = new HTableAsync(configuration, tableName);
+      hta = new HTableAsync(this);
     }
   }
 
@@ -192,7 +192,27 @@ public class HTable implements HTableInt
         HConstants.HTABLE_ASYNC_CALLS_DEFAULT)
         && configuration.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
             HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
-    
HBaseThriftRPC.setUsePooling(conf.getBoolean("hbase.client.useconnectionpooling",
 true));
+  }
+
+  /**
+   * Shallow copy constructor
+   *
+   * @param t HTable instance to copy from
+   */
+  public HTable(HTable t) {
+    this.tableName = t.tableName;
+    this.scannerTimeout = t.scannerTimeout;
+    this.connection = t.connection;
+    this.configuration = t.configuration;
+    this.writeBufferSize = t.writeBufferSize;
+    this.clearBufferOnFail = t.clearBufferOnFail;
+    this.autoFlush = t.autoFlush;
+    this.currentWriteBufferSize = t.currentWriteBufferSize;
+    this.scannerCaching = t.scannerCaching;
+    this.maxKeyValueSize = t.maxKeyValueSize;
+    this.options = t.options;
+    this.recordClientContext = t.recordClientContext;
+    this.maxScannerResultSize = t.maxScannerResultSize;
   }
 
   @Override

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
 Fri Mar 14 18:09:43 2014
@@ -24,6 +24,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -43,10 +46,15 @@ import org.apache.hadoop.hbase.util.Daem
  */
 public class HTableAsync extends HTable implements HTableAsyncInterface {
 
-  //TODO: decide what is a good number of core threads. Max thread number 
seems unconfigurable.
-  private final ListeningScheduledExecutorService executorService =
-      MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(
-          HConstants.DEFAULT_HTABLE_ASYNC_CORE_THREADS, new 
DaemonThreadFactory("htable-async-thread-")));
+  private static final ListeningScheduledExecutorService executorService;
+  static {
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+        HConstants.DEFAULT_HTABLE_ASYNC_CORE_THREADS, new 
DaemonThreadFactory("htable-async-thread-"));
+    
((ThreadPoolExecutor)executor).setMaximumPoolSize(HConstants.DEFAULT_HTABLE_ASYNC_MAX_THREADS);
+    ((ThreadPoolExecutor)executor).setKeepAliveTime(
+        HConstants.DEFAULT_HTABLE_ASYNC_KEEPALIVE_SECONDS, TimeUnit.SECONDS);
+    executorService = MoreExecutors.listeningDecorator(executor);
+  }
 
   private HConnectionParams hConnectionParams;
 
@@ -78,6 +86,12 @@ public class HTableAsync extends HTable 
     this.hConnectionParams = HConnectionParams.getInstance(conf);
   }
 
+  public HTableAsync(HTable t) {
+    super(t);
+
+    this.hConnectionParams = HConnectionParams.getInstance(getConfiguration());
+  }
+
   /**
    * {@inheritDoc}
    */

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
 Fri Mar 14 18:09:43 2014
@@ -120,8 +120,10 @@ public abstract class ServerCallable<T> 
     return tableName;
   }
 
-  public void refreshServerConnection(Exception e) throws IOException {
-    ((HBaseToThriftAdapter)server).refreshConnectionAndThrowIOException(e);
+  public void cleanUpServerConnection(Exception e) throws IOException {
+    if (server instanceof HBaseToThriftAdapter) {
+      ((HBaseToThriftAdapter)server).cleanUpServerConnection(e);
+    }
   }
 
   public void updateFailureInfoForServer(boolean didTry, boolean 
couldNotCommunicate) {
@@ -132,4 +134,10 @@ public abstract class ServerCallable<T> 
   public void handleThrowable(Throwable t, MutableBoolean 
couldNotCommunicateWithServer) throws Exception {
     ((HConnectionManager.TableServers)connection).handleThrowable(t, this, 
couldNotCommunicateWithServer);
   }
+
+  public void readHeader() {
+    if (server instanceof HBaseToThriftAdapter) {
+      ((HBaseToThriftAdapter)server).readHeader();
+    }
+  }
 }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCOptions.java
 Fri Mar 14 18:09:43 2014
@@ -22,7 +22,7 @@ public class HBaseRPCOptions implements 
 
   // this will be used as profiling data in htable so it's possible to
   // set it after receiving profiling data. do not need to serialize this.
-  public ProfilingData profilingResult = null;
+  public volatile ProfilingData profilingResult = null;
 
   public HBaseRPCOptions () {}
 

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
 Fri Mar 14 18:09:43 2014
@@ -54,11 +54,6 @@ public class HBaseThriftRPC {
 
   private static Configuration metaConf = null;
 
-  private static boolean USE_POOLING = true;
-  private static ConcurrentHashMap<InetSocketAddress, 
ThriftClientObjectFactory> factoryMap =
-      new ConcurrentHashMap<>();
-  private static ThriftClientManager sharedManager = new ThriftClientManager();
-
   public static void clearAll() throws Exception {
     metaConf = null;
     isMeta.get().clear();
@@ -84,20 +79,6 @@ public class HBaseThriftRPC {
       InetSocketAddress addr,
       Configuration conf,
       Class<? extends ThriftClientInterface> clazz) throws IOException {
-    if (!USE_POOLING) {
-      ThriftClientObjectFactory factory = factoryMap.get(addr);
-      if (factory == null) {
-        ThriftClientManager manager = new ThriftClientManager();
-        factory = new ThriftClientObjectFactory(addr, clazz, manager, conf);
-        factoryMap.putIfAbsent(addr, factory);
-      }
-      try {
-        return new Pair<>(factory.makeObject(), sharedManager);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-
     conf = checkIfInMeta(conf);
     ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
     if (clientsForConf == null) {
@@ -111,7 +92,7 @@ public class HBaseThriftRPC {
       }
     }
     try {
-      ThriftClientInterface client = null;
+      ThriftClientInterface client;
       try {
         client = clientsForConf.getClient(addr, clazz);
       } catch (Exception e) {
@@ -158,14 +139,36 @@ public class HBaseThriftRPC {
       InetSocketAddress inetSocketAddress,
       Configuration conf, ThriftClientInterface thriftClient,
       Class<? extends ThriftClientInterface> clientInterface) throws 
IOException {
-    if (!USE_POOLING) {
-      try {
+    conf = checkIfInMeta(conf);
+    ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
+    try {
+      if (clientsForConf == null) {
+        LOG.error("Client cache pool for current configuration is null.");
         thriftClient.close();
-      } catch (Exception e) {
-        LOG.warn("Unable to close client because: ", e);
+      } else {
+        clientsForConf.close(inetSocketAddress, clientInterface, thriftClient);
       }
-      return new Pair<>(null, null);
+      LOG.debug("Refreshing client connection due to an error in the channel. 
Remote address: " + inetSocketAddress);
+      return getClientWithoutWrapper(inetSocketAddress, conf, clientInterface);
+    } catch (Exception e) {
+      throw new IOException(e);
     }
+  }
+
+  /**
+   * Clean up current connection.
+   *
+   * @param inetSocketAddress
+   * @param conf
+   * @param thriftClient
+   * @param clientInterface
+   * @throws IOException
+   */
+  protected static void cleanUpConnection(
+      InetSocketAddress inetSocketAddress,
+      Configuration conf,
+      ThriftClientInterface thriftClient,
+      Class<? extends ThriftClientInterface> clientInterface) throws 
IOException {
     conf = checkIfInMeta(conf);
     ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
     try {
@@ -175,8 +178,7 @@ public class HBaseThriftRPC {
       } else {
         clientsForConf.close(inetSocketAddress, clientInterface, thriftClient);
       }
-      LOG.debug("Refreshing client connection due to an error in the channel. 
Remote address: " + inetSocketAddress);
-      return getClientWithoutWrapper(inetSocketAddress, conf, clientInterface);
+      LOG.debug("Clean up connection due to an error in the channel");
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -187,6 +189,8 @@ public class HBaseThriftRPC {
     if (!isMeta.get().isEmpty() && isMeta.get().peek()) {
       if (metaConf == null) {
         metaConf = (HBaseConfiguration.create(inputConf));
+        // Make metaConf different from inputConf
+        metaConf.setStrings("metaConf", "yes");
       }
       conf = metaConf;
     }
@@ -196,18 +200,6 @@ public class HBaseThriftRPC {
   public static void putBackClient(ThriftClientInterface server,
       InetSocketAddress addr, Configuration conf,
       Class<? extends ThriftClientInterface> clazz) throws Exception {
-    if (!USE_POOLING) {
-      try {
-        // FIXME
-        // When using async, without connection pooling, because as per @fan,
-        // we would just close the connection before the async call finishes.
-        server.close();
-      } catch (Exception e) {
-        LOG.warn("Could not close connection to " + addr.toString(), e);
-      }
-      return;
-    }
-
     conf = checkIfInMeta(conf);
     ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
     if (clientsForConf == null) {
@@ -215,8 +207,4 @@ public class HBaseThriftRPC {
     }
     clientsForConf.putBackClient(server, addr, clazz);
   }
-
-  public static void setUsePooling(boolean b) {
-    USE_POOLING = b;
-  }
 }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
 Fri Mar 14 18:09:43 2014
@@ -118,6 +118,7 @@ public class HBaseToThriftAdapter implem
     } else if (e instanceof RuntimeTApplicationException) {
       throw new RuntimeException(e);
     } else {
+      //TODO: creating a new connection is unnecessary. We should replace it 
with cleanUpConnection later
       Pair<ThriftClientInterface, ThriftClientManager> interfaceAndManager = 
HBaseThriftRPC
           .refreshConnection(this.addr, this.conf, this.connection, 
this.clazz);
       this.connection = (ThriftHRegionInterface) 
interfaceAndManager.getFirst();
@@ -126,6 +127,26 @@ public class HBaseToThriftAdapter implem
     }
   }
 
+  /**
+   * In contrast to refreshConnectionAndThrowIOException(), it tries to clean
+   * up failed connection from the pool without creating new ones, because 
that's
+   * unnecessary.
+   *
+   * @param e
+   * @throws IOException
+   */
+  public void cleanUpServerConnection(Exception e) throws IOException {
+    if (e instanceof TApplicationException) {
+      throw new RuntimeException(e);
+    } else if (e instanceof RuntimeTApplicationException) {
+      throw new RuntimeException(e);
+    } else {
+      HBaseThriftRPC.cleanUpConnection(this.addr, this.conf, this.connection, 
this.clazz);
+      this.connection = null;
+      this.clientManager = null;
+    }
+  }
+
   private void refreshConnectionAndThrowRuntimeException(
       Exception e) {
     try {
@@ -213,8 +234,7 @@ public class HBaseToThriftAdapter implem
       if (this.useHeaderProtocol) {
         readHeader();
       }
-      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf,
-        this.clazz);
+      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, 
this.clazz);
       this.connection = null;
       this.clientManager = null;
     } catch (Exception e) {
@@ -222,6 +242,14 @@ public class HBaseToThriftAdapter implem
     }
   }
 
+  private void putBackClient() {
+    try {
+      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, 
this.clazz);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private void handleIOException(Exception e) throws IOException {
     if (e instanceof IOException) {
       throw (IOException) e;
@@ -332,7 +360,7 @@ public class HBaseToThriftAdapter implem
     try {
       return connection.getClosestRowBeforeAsync(regionName, row, family);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 
@@ -557,7 +585,7 @@ public class HBaseToThriftAdapter implem
     try {
       return connection.getAsync(regionName, get);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 
@@ -632,7 +660,7 @@ public class HBaseToThriftAdapter implem
     try {
       return connection.deleteAsync(regionName, delete);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 
@@ -755,7 +783,7 @@ public class HBaseToThriftAdapter implem
     } catch (IOException e) {
       return Futures.immediateFailedFuture(e);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 
@@ -878,7 +906,7 @@ public class HBaseToThriftAdapter implem
     try {
       return connection.lockRowAsync(regionName, row);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 
@@ -904,7 +932,7 @@ public class HBaseToThriftAdapter implem
     try {
       return connection.unlockRowAsync(regionName, lockId);
     } finally {
-      postProcess();
+      putBackClient();
     }
   }
 

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
 Fri Mar 14 18:09:43 2014
@@ -95,7 +95,7 @@ public class ThriftClientCacheWithConnec
    * @param clazz
    * @return
    */
-  public GenericObjectPool<ThriftClientInterface> createGenericObjectPool(
+  private GenericObjectPool<ThriftClientInterface> createGenericObjectPool(
       InetSocketAddress address, Class<? extends ThriftClientInterface> clazz) 
{
     ThriftClientObjectFactory factory = new ThriftClientObjectFactory(address,
         clazz, this.clientManager, this.conf);

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
 Fri Mar 14 18:09:43 2014
@@ -72,7 +72,6 @@ public class SelfRetryingListenableFutur
 
   private int tries = 0;
   private List<Throwable> exceptions = new ArrayList<>();
-  private RegionOverloadedException roe = null;
 
   long callStartTime = 0;
   int serverRequestedRetries = 0;
@@ -123,7 +122,6 @@ public class SelfRetryingListenableFutur
     }
 
     tries++;
-    LOG.debug("Try number: " + tries);
 
     didTry = false;
     try {
@@ -152,7 +150,7 @@ public class SelfRetryingListenableFutur
         }
       }, executorService);
     } catch (Exception e) {
-      LOG.error("Cannot create upstream listenable future at the first place", 
e);
+      LOG.error("Cannot create upstream listenable future at the first place, 
try number: " + tries, e);
       handleException(e);
     }
   }
@@ -163,6 +161,7 @@ public class SelfRetryingListenableFutur
    * @param v Result from server
    */
   private void setSuccess(V v) {
+    callable.readHeader();
     downstream.set(v);
   }
 
@@ -190,12 +189,12 @@ public class SelfRetryingListenableFutur
       // Skip exception handling in TableServers and just fast fail to next 
retry.
       retryOrStop(t);
     } else {
-      LOG.debug("Other exception type, detecting if it's need to refresh 
connection", t);
+      LOG.debug("Other exception type, detecting if it's need to cleanup 
connection", t);
       if (!didTry) {
         // When the call to server is actually made,
-        //   try to refresh server connection if it's necessary.
+        //   try to clean up server connection if it's necessary.
         try {
-          callable.refreshServerConnection((Exception)t);
+          callable.cleanUpServerConnection((Exception) t);
         } catch (Exception e) {
           // Decide next step according to the original exception. Do not use 
the this exception
           //   which is wrapped by connection refreshing.
@@ -234,7 +233,7 @@ public class SelfRetryingListenableFutur
       // So there is no retry
       setFailure(t);
     } else if (t instanceof RegionOverloadedException) {
-      roe = (RegionOverloadedException)t;
+      RegionOverloadedException roe = (RegionOverloadedException)t;
       serverRequestedWaitTime = roe.getBackoffTimeMillis();
 
       // If server requests wait. We will wait for that time, and start

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHeaderSendReceive.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHeaderSendReceive.java?rev=1577638&r1=1577637&r2=1577638&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHeaderSendReceive.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHeaderSendReceive.java
 Fri Mar 14 18:09:43 2014
@@ -18,7 +18,8 @@
  */
 package org.apache.hadoop.hbase.thrift;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableAsync;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -69,28 +71,28 @@ public class TestHeaderSendReceive {
    * The client requests profiling
    */
   @Test
-  public void testProfilingData() throws IOException {
+  public void testProfilingData() throws Exception {
     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
     Put put = new Put(r1);
     put.add(FAMILY, null, value);
     ht.setProfiling(true);
     ht.put(put);
     ProfilingData pd = ht.getProfilingData();
-    assertTrue(pd != null);
+    assertNotNull(pd);
     System.out.println("profiling data after first put: " + pd);
 
     // disable profiling and check that we get no profiling data back
     ht.setProfiling(false);
     ht.put(put);
     pd = ht.getProfilingData();
-    assertTrue(pd == null);
+    assertNull(pd);
 
     put = new Put(r2);
     put.add(FAMILY, null, value);
     ht.setProfiling(true);
     ht.put(put);
     pd = ht.getProfilingData();
-    assertTrue(pd != null);
+    assertNotNull(pd);
     System.out.println("profiling data after second put: " + pd);
 
     // make a get
@@ -99,7 +101,14 @@ public class TestHeaderSendReceive {
     ht.get(get);
     pd = ht.getProfilingData();
     System.out.println("profiling data after get: " + pd);
-    assertTrue(pd != null);
+    assertNotNull(pd);
+
+    // test async get
+    ht.setProfiling(true);
+    ((HTableAsync)ht).getAsync(get).get();
+    pd = ht.getProfilingData();
+    System.out.println("profiling data after get: " + pd);
+    assertNotNull(pd);
   }
 
   /**
@@ -116,7 +125,7 @@ public class TestHeaderSendReceive {
     p.add(FAMILY, null, value);
     ht.put(p);
     ProfilingData pd = ht.getProfilingData();
-    assertTrue(pd != null);
+    assertNotNull(pd);
     System.out.println("profiling data: " + pd);
   }
 


Reply via email to