PHOENIX-4685 Properly handle connection caching for Phoenix inside 
RegionServers(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dde1054f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dde1054f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dde1054f

Branch: refs/heads/4.x-cdh5.12
Commit: dde1054f34fe1784447307600d0949f35e6f475a
Parents: 8ed7eb0
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Tue May 8 09:26:41 2018 +0530
Committer: James Taylor <jtay...@salesforce.com>
Committed: Wed May 9 13:27:36 2018 -0700

----------------------------------------------------------------------
 .../DelegateRegionCoprocessorEnvironment.java   |   7 +-
 .../UngroupedAggregateRegionObserver.java       |  14 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |  19 +--
 .../hbase/index/write/IndexWriterUtils.java     |  27 +---
 .../index/PhoenixTransactionalIndexer.java      |  18 +--
 .../org/apache/phoenix/util/ServerUtil.java     | 141 ++++++++++++++++---
 6 files changed, 142 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
index 284d53c..a791f4a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 /**
  * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix 
coprocessors. Often we
@@ -44,10 +45,10 @@ public class DelegateRegionCoprocessorEnvironment 
implements RegionCoprocessorEn
     private RegionCoprocessorEnvironment delegate;
     private HTableFactory tableFactory;
 
-    public DelegateRegionCoprocessorEnvironment(Configuration config, 
RegionCoprocessorEnvironment delegate) {
-        this.config = config;
+    public DelegateRegionCoprocessorEnvironment(RegionCoprocessorEnvironment 
delegate, ConnectionType connectionType) {
+        this.config = 
ServerUtil.ConnectionFactory.getTypeSpecificConfiguration(connectionType, 
delegate.getConfiguration());
         this.delegate = delegate;
-        this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config);
+        this.tableFactory = ServerUtil.getDelegateHTableFactory(this, 
connectionType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6bee65c..14213f4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -144,6 +144,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,14 +226,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         
upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
             InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
 
-        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
-        // lower the number of rpc retries, so we don't hang the compaction
-        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
-                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
-        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
-            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
-                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+        compactionConfig = 
ServerUtil.getCompactionConfig(e.getConfiguration());
 
         // For retries of index write failures, use the same # of retries as 
the rebuilder
         indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
@@ -984,7 +978,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     InternalScanner internalScanner = scanner;
                     try {
                         long clientTimeStamp = 
EnvironmentEdgeManager.currentTimeMillis();
-                        DelegateRegionCoprocessorEnvironment compactionConfEnv 
= new DelegateRegionCoprocessorEnvironment(compactionConfig, 
c.getEnvironment());
+                        DelegateRegionCoprocessorEnvironment compactionConfEnv 
=
+                                new 
DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
+                                        ConnectionType.COMPACTION_CONNECTION);
                         StatisticsCollector stats = 
StatisticsCollectorFactory.createStatisticsCollector(
                             compactionConfEnv, table.getNameAsString(), 
clientTimeStamp,
                             store.getFamily().getName());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 1ef09fe..e4e6991 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -93,6 +93,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -220,25 +221,11 @@ public class Indexer extends BaseRegionObserver {
     
         this.builder = new IndexBuildManager(env);
         // Clone the config since it is shared
-        Configuration clonedConfig = 
PropertiesUtil.cloneConfig(e.getConfiguration());
-        /*
-         * Set the rpc controller factory so that the HTables used by 
IndexWriter would
-         * set the correct priorities on the remote RPC calls.
-         */
-        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-                InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
-        // lower the number of rpc retries.  We inherit config from 
HConnectionManager#setServerSideHConnectionRetries,
-        // which by default uses a multiplier of 10.  That is too many retries 
for our synchronous index writes
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
-                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, 
env.getConfiguration()
-            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
-        DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(clonedConfig, env);
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         this.writer = new IndexWriter(indexWriterEnv, serverName + 
"-index-writer");
         
-        this.rowLockWaitDuration = 
clonedConfig.getInt("hbase.rowlock.wait.duration",
+        this.rowLockWaitDuration = 
env.getConfiguration().getInt("hbase.rowlock.wait.duration",
                 DEFAULT_ROWLOCK_WAIT_DURATION);
         this.lockManager = new LockManager();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 0d3004f..ef53b9f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 public class IndexWriterUtils {
 
@@ -50,9 +51,9 @@ public class IndexWriterUtils {
    * threads. Currently, HBase doesn't support a custom thread-pool to back 
the HTable via the
    * coprocesor hooks, so we can't modify this behavior.
    */
-  private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+  public static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
       "index.writer.threads.pertable.max";
-  private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+  public static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
 
   /** Configuration key that HBase uses to set the max number of threads for 
an HTable */
   public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
@@ -79,19 +80,7 @@ public class IndexWriterUtils {
   }
 
     public static HTableFactory 
getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
-        // create a simple delegate factory, setup the way we need
-        Configuration conf = 
PropertiesUtil.cloneConfig(env.getConfiguration());
-        setHTableThreads(conf);
-        return ServerUtil.getDelegateHTableFactory(env, conf);
-    }
-
-    private static void setHTableThreads(Configuration conf) {
-        // set the number of threads allowed per table.
-        int htableThreads =
-                
conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
-                    IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
-        LOG.trace("Creating HTableFactory with " + htableThreads + " threads 
for each HTable.");
-        IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, 
htableThreads);
+        return ServerUtil.getDelegateHTableFactory(env, 
ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS);
     }
 
     /**
@@ -99,12 +88,8 @@ public class IndexWriterUtils {
      * instead to avoid tying up the handler
      */
     public static HTableFactory 
getNoRetriesHTableFactory(CoprocessorEnvironment env) {
-        Configuration conf = 
PropertiesUtil.cloneConfig(env.getConfiguration());
-        setHTableThreads(conf);
-        // note in HBase 2+, numTries = numRetries + 1
-        // in prior versions, numTries = numRetries
-        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-        return ServerUtil.getDelegateHTableFactory(env, conf);
+        return ServerUtil.getDelegateHTableFactory(env,
+            
ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 610ea44..0893bf6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
 
 /**
  * Do all the work of managing local index updates for a transactional table 
from a single coprocessor. Since the transaction
@@ -90,22 +91,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         Configuration conf = e.getConfiguration();
         String serverName = 
env.getRegionServerServices().getServerName().getServerName();
         codec = new PhoenixIndexCodec(conf, 
env.getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey(), 
env.getRegionInfo().getTable().getName());
-        // Clone the config since it is shared
-        Configuration clonedConfig = 
PropertiesUtil.cloneConfig(e.getConfiguration());
-        /*
-         * Set the rpc controller factory so that the HTables used by 
IndexWriter would
-         * set the correct priorities on the remote RPC calls.
-         */
-        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-                InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
-        // lower the number of rpc retries.  We inherit config from 
HConnectionManager#setServerSideHConnectionRetries,
-        // which by default uses a multiplier of 10.  That is too many retries 
for our synchronous index writes
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
-                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
-        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, 
env.getConfiguration()
-            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
-        DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(clonedConfig, env);
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         // For transactional tables, we keep the index active upon a write 
failure
         // since we have the all versus none behavior for transactions. Also, 
we

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dde1054f/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 4b3cc43..2dab076 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -17,11 +17,17 @@
  */
 package org.apache.phoenix.util;
 
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -35,12 +41,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.CoprocessorHConnection;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -52,8 +61,13 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
 
 
 @SuppressWarnings("deprecation")
@@ -269,12 +283,12 @@ public class ServerUtil {
                     endKey) < 0));
     }
 
-    public static HTableFactory 
getDelegateHTableFactory(CoprocessorEnvironment env, Configuration conf) {
+    public static HTableFactory 
getDelegateHTableFactory(CoprocessorEnvironment env, ConnectionType 
connectionType) {
         if (env instanceof RegionCoprocessorEnvironment) {
             RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) 
env;
             RegionServerServices services = e.getRegionServerServices();
             if (services instanceof HRegionServer) {
-                return new CoprocessorHConnectionTableFactory(conf, 
(HRegionServer) services);
+                return new 
CoprocessorHConnectionTableFactory(env.getConfiguration(), (HRegionServer) 
services, connectionType);
             }
         }
         return new CoprocessorHTableFactory(env);
@@ -286,44 +300,133 @@ public class ServerUtil {
      * https://issues.apache.org/jira/browse/HBASE-18359
      */
     public static class CoprocessorHConnectionTableFactory implements 
HTableFactory {
-        @GuardedBy("CoprocessorHConnectionTableFactory.this")
-        private HConnection connection;
         private final Configuration conf;
         private final HRegionServer server;
+        private final ConnectionType connectionType;
 
-        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer 
server) {
+        CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer 
server, ConnectionType connectionType) {
             this.conf = conf;
             this.server = server;
+            this.connectionType = connectionType;
         }
 
-        private synchronized HConnection getConnection(Configuration conf) 
throws IOException {
-            if (connection == null || connection.isClosed()) {
-                connection = new CoprocessorHConnection(conf, server);
-            }
-            return connection;
+        private ClusterConnection getConnection() throws IOException {
+            return ConnectionFactory.getConnection(connectionType, conf, 
server);
         }
 
         @Override
         public HTableInterface getTable(ImmutableBytesPtr tablename) throws 
IOException {
-            return 
getConnection(conf).getTable(tablename.copyBytesIfNecessary());
+            return getConnection().getTable(tablename.copyBytesIfNecessary());
         }
 
         @Override
         public synchronized void shutdown() {
-            try {
-                if (connection != null && !connection.isClosed()) {
-                    connection.close();
-                }
-            } catch (Throwable e) {
-                LOG.warn("Error while trying to close the HConnection used by 
CoprocessorHConnectionTableFactory", e);
-            }
+            // We need not close the cached connections as they are shared 
across the server.
         }
 
         @Override
         public HTableInterface getTable(ImmutableBytesPtr tablename, 
ExecutorService pool)
                 throws IOException {
-            return 
getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool);
+            return getConnection().getTable(tablename.copyBytesIfNecessary(), 
pool);
+        }
+    }
+
+    public static enum ConnectionType {
+        COMPACTION_CONNECTION,
+        INDEX_WRITER_CONNECTION,
+        INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS,
+        INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES,
+        DEFAULT_SERVER_CONNECTION;
+    }
+
+    public static class ConnectionFactory {
+        
+        private static Map<ConnectionType, ClusterConnection> connections =
+                new ConcurrentHashMap<ConnectionType, ClusterConnection>();
+
+        public static ClusterConnection getConnection(final ConnectionType 
connectionType, final Configuration conf, final HRegionServer server) throws 
IOException {
+            ClusterConnection connection = null;
+            if((connection = connections.get(connectionType)) == null) {
+                synchronized (CoprocessorHConnectionTableFactory.class) {
+                    if(connections.get(connectionType) == null) {
+                        connection = new CoprocessorHConnection(conf, server);
+                        connections.put(connectionType, connection);
+                        return connection;
+                    }
+                }
+            }
+            return connection;
         }
+
+        public static Configuration 
getTypeSpecificConfiguration(ConnectionType connectionType, Configuration conf) 
{
+            switch (connectionType) {
+            case COMPACTION_CONNECTION:
+                return getCompactionConfig(conf);
+            case DEFAULT_SERVER_CONNECTION:
+                return conf;
+            case INDEX_WRITER_CONNECTION:
+                return getIndexWriterConnection(conf);
+            case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS:
+                return getIndexWriterConfigurationWithCustomThreads(conf);
+            case INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS_NO_RETRIES:
+                return 
getNoRetriesIndexWriterConfigurationWithCustomThreads(conf);
+            default:
+                return conf;
+            }
+        }
+    }
+
+    public static Configuration getCompactionConfig(Configuration conf) {
+        Configuration compactionConfig = PropertiesUtil.cloneConfig(conf);
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            conf.getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            conf.getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+        return compactionConfig;
+    }
+
+    public static Configuration getIndexWriterConnection(Configuration conf) {
+        Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
+        /*
+         * Set the rpc controller factory so that the HTables used by 
IndexWriter would
+         * set the correct priorities on the remote RPC calls.
+         */
+        clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+                InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from 
HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries 
for our synchronous index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            conf.getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, conf
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
+        return clonedConfig;
+    }
+
+    public static Configuration 
getIndexWriterConfigurationWithCustomThreads(Configuration conf) {
+        Configuration clonedConfig = PropertiesUtil.cloneConfig(conf);
+        setHTableThreads(clonedConfig);
+        return clonedConfig;
+    }
+
+    private static void setHTableThreads(Configuration conf) {
+        // set the number of threads allowed per table.
+        int htableThreads =
+                
conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
+                    IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+        IndexManagementUtil.setIfNotSet(conf, 
IndexWriterUtils.HTABLE_THREAD_KEY, htableThreads);
+    }
+    
+    public static Configuration 
getNoRetriesIndexWriterConfigurationWithCustomThreads(Configuration conf) {
+        Configuration clonedConf = 
getIndexWriterConfigurationWithCustomThreads(conf);
+        // note in HBase 2+, numTries = numRetries + 1
+        // in prior versions, numTries = numRetries
+        clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+        return clonedConf;
+
     }
 
 }

Reply via email to