PHOENIX-1795 Set handlerCount, numQueues and maxQueueLength of index and metadata queues correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2cf44c3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2cf44c3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2cf44c3 Branch: refs/heads/calcite Commit: e2cf44c3c22f8789c4bd1fe529f07f2d6e45e482 Parents: d05d7c8 Author: Thomas <[email protected]> Authored: Mon Mar 30 15:21:44 2015 -0700 Committer: Thomas <[email protected]> Committed: Tue Mar 31 13:34:17 2015 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/QueryDatabaseMetaDataIT.java | 1 + .../org/apache/phoenix/rpc/PhoenixClientRpcIT.java | 17 ++++------------- .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 15 ++++++--------- .../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 16 +++++++++++----- .../org/apache/phoenix/query/QueryServices.java | 4 ++++ .../apache/phoenix/query/QueryServicesOptions.java | 10 ++++++++-- .../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 5 ++--- .../java/org/apache/phoenix/query/BaseTest.java | 10 ++++++++++ 8 files changed, 46 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java index 44086d7..c9ec0ce 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java @@ -682,6 +682,7 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT { descriptor.addFamily(columnDescriptor); } admin.createTable(descriptor); + admin.close(); long ts = nextTimestamp(); Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java index deb14db..0c61b55 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java @@ -17,13 +17,11 @@ import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.Collections; import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.ipc.CallRunner; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; -import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; import org.apache.phoenix.util.PropertiesUtil; @@ -35,8 +33,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import com.google.common.collect.Maps; - public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT { private static final String SCHEMA_NAME = "S"; @@ -45,15 +41,10 @@ public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT { @BeforeClass public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); - serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - TestPhoenixIndexRpcSchedulerFactory.class.getName()); - serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName()); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName()); + Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + TestPhoenixIndexRpcSchedulerFactory.class.getName()); NUM_SLAVES_BASE = 2; - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet() - .iterator())); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS); } @AfterClass http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index b04f636..dbcd7ac 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.ipc.CallRunner; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -54,8 +54,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import com.google.common.collect.Maps; - public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT { private static final String SCHEMA_NAME = "S"; @@ -65,12 +63,11 @@ public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT { @BeforeClass public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); - serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - TestPhoenixIndexRpcSchedulerFactory.class.getName()); - serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName()); - Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); - clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, RpcControllerFactory.class.getName()); + Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + TestPhoenixIndexRpcSchedulerFactory.class.getName()); + // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue + Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + RpcControllerFactory.class.getName()); NUM_SLAVES_BASE = 2; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java index e721271..362e2cc 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import com.google.common.annotations.VisibleForTesting; @@ -43,15 +45,19 @@ public class PhoenixRpcScheduler extends RpcScheduler { public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) { // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 - int maxQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); + int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); + int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); - int numQueues = Math.max(1, Math.round(callQueuesHandlersFactor)); + int numIndexQueues = Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor)); + int numMetadataQueues = Math.max(1, Math.round(metadataHandlerCount * callQueuesHandlersFactor)); this.indexPriority = indexPriority; this.metadataPriority = metadataPriority; this.delegate = delegate; - this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", 1, numQueues, maxQueueLength); - this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", 1, numQueues, maxQueueLength); + this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, numIndexQueues, maxIndexQueueLength); + this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, numMetadataQueues, maxMetadataQueueLength); } @Override @@ -120,4 +126,4 @@ public class PhoenixRpcScheduler extends RpcScheduler { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 65f6acf..7a911e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -152,6 +152,10 @@ public interface QueryServices extends SQLCloseable { public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells"; public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls"; public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled"; + + // rpc queue configs + public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count"; + public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 97040d2..3561663 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -67,8 +67,10 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.trace.util.Tracing; @@ -147,6 +149,8 @@ public class QueryServicesOptions { public static final int DEFAULT_INDEX_PRIORITY = 1000; public static final int DEFAULT_METADATA_PRIORITY = 2000; public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true; + public static final int DEFAULT_INDEX_HANDLER_COUNT = 30; + public static final int DEFAULT_METADATA_HANDLER_COUNT = 30; public static final int DEFAULT_TRACING_PAGE_SIZE = 100; /** @@ -185,6 +189,8 @@ public class QueryServicesOptions { public static final boolean DEFAULT_AUTO_COMMIT = false; public static final boolean DEFAULT_IS_METRICS_ENABLED = true; + private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); + private final Configuration config; private QueryServicesOptions(Configuration config) { @@ -237,7 +243,7 @@ public class QueryServicesOptions { .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK) .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK) .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED) - .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName()); + .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -508,8 +514,8 @@ public class QueryServicesOptions { public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) { config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis); return this; - } + } public QueryServicesOptions setMetricsEnabled(boolean flag) { config.setBoolean(METRICS_ENABLED, flag); return this; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java index 0d3c461..d4956ee 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java @@ -55,14 +55,13 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { private boolean closed = false; public PhoenixTestDriver() { - this.overrideProps = ReadOnlyProps.EMPTY_PROPS; - queryServices = new QueryServicesTestImpl(getDefaultProps()); + this(ReadOnlyProps.EMPTY_PROPS); } // For tests to override the default configuration public PhoenixTestDriver(ReadOnlyProps props) { overrideProps = props; - queryServices = new QueryServicesTestImpl(getDefaultProps(),overrideProps); + queryServices = new QueryServicesTestImpl(getDefaultProps(), overrideProps); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 748ad19..e5884c3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -118,7 +118,12 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.LocalIndexMerger; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseClientManagedTimeIT; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; @@ -458,6 +463,8 @@ public abstract class BaseTest { private static final String ORG_ID = "00D300000000XHP"; protected static int NUM_SLAVES_BASE = 1; + private static final String DEFAULT_SERVER_RPC_CONTROLLER_FACTORY = ServerRpcControllerFactory.class.getName(); + private static final String DEFAULT_RPC_SCHEDULER_FACTORY = PhoenixRpcSchedulerFactory.class.getName(); protected static String getZKClientPort(Configuration conf) { return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB); @@ -613,6 +620,9 @@ public abstract class BaseTest { } //no point doing sanity checks when running tests. conf.setBoolean("hbase.table.sanity.checks", false); + // set the server rpc controller and rpc scheduler factory, used to configure the cluster + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_SERVER_RPC_CONTROLLER_FACTORY); + conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY); // override any defaults based on overrideProps for (Entry<String,String> entry : overrideProps) {
