PHOENIX-1457 Use high priority queue for metadata endpoint calls
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0b49b28f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0b49b28f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0b49b28f Branch: refs/heads/4.3 Commit: 0b49b28f8c95ca2b5bb75cd115cad0bb3db4118c Parents: eeff9be Author: Thomas D'Silva <twdsi...@gmail.com> Authored: Tue Mar 24 17:17:44 2015 -0700 Committer: Thomas <tdsi...@salesforce.com> Committed: Thu Mar 26 21:32:01 2015 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/IndexHandlerIT.java | 12 +- .../phoenix/end2end/index/IndexQosIT.java | 240 ------------------- .../apache/phoenix/rpc/PhoenixClientRpcIT.java | 122 ++++++++++ .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 235 ++++++++++++++++++ .../TestPhoenixIndexRpcSchedulerFactory.java | 58 +++++ .../hbase/ipc/PhoenixIndexRpcScheduler.java | 123 ---------- .../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 123 ++++++++++ .../hbase/ipc/PhoenixRpcSchedulerFactory.java | 95 ++++++++ .../controller/ClientRpcControllerFactory.java | 60 +++++ .../ipc/controller/IndexRpcController.java | 51 ++++ .../ipc/controller/MetadataRpcController.java | 55 +++++ .../controller/ServerRpcControllerFactory.java | 62 +++++ .../index/IndexQosRpcControllerFactory.java | 82 ------- .../ipc/PhoenixIndexRpcSchedulerFactory.java | 91 ------- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 4 - .../org/apache/phoenix/query/QueryServices.java | 5 +- .../phoenix/query/QueryServicesOptions.java | 12 +- .../org/apache/phoenix/util/SchemaUtil.java | 7 - .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 16 +- .../PhoenixIndexRpcSchedulerFactoryTest.java | 106 -------- .../PhoenixRpcSchedulerFactoryTest.java | 125 ++++++++++ .../java/org/apache/phoenix/query/BaseTest.java | 12 +- 22 files changed, 1017 insertions(+), 679 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java index 1507d6b..20a780a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexHandlerIT.java @@ -35,8 +35,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory; import org.apache.phoenix.hbase.index.TableName; import org.apache.phoenix.query.QueryServicesOptions; import org.junit.After; @@ -53,11 +53,11 @@ public class IndexHandlerIT { public static class CountingIndexClientRpcFactory extends RpcControllerFactory { - private IndexQosRpcControllerFactory delegate; + private ServerRpcControllerFactory delegate; public CountingIndexClientRpcFactory(Configuration conf) { super(conf); - this.delegate = new IndexQosRpcControllerFactory(conf); + this.delegate = new ServerRpcControllerFactory(conf); } @Override @@ -146,8 +146,8 @@ public class IndexHandlerIT { conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, CountingIndexClientRpcFactory.class.getName()); // and set the index table as the current table - conf.setStrings(IndexQosRpcControllerFactory.INDEX_TABLE_NAMES_KEY, - TestTable.getTableNameString()); +// conf.setStrings(PhoenixRpcControllerFactory.INDEX_TABLE_NAMES_KEY, +// TestTable.getTableNameString()); HTable table = new HTable(conf, TestTable.getTableName()); // do a write to the table @@ -159,7 +159,7 @@ public class IndexHandlerIT { // check the counts on the rpc controller assertEquals("Didn't get the expected number of index priority writes!", 1, (int) CountingIndexClientRpcController.priorityCounts - .get(QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY)); + .get(QueryServicesOptions.DEFAULT_INDEX_PRIORITY)); table.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java deleted file mode 100644 index 7338b40..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexQosIT.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ -package org.apache.phoenix.end2end.index; - -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; -import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; -import static org.apache.phoenix.util.TestUtil.LOCALHOST; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor; -import org.apache.hadoop.hbase.ipc.CallRunner; -import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.ipc.RpcExecutor; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; -import org.apache.phoenix.hbase.index.IndexQosRpcControllerFactory; -import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; -import org.apache.phoenix.jdbc.PhoenixTestDriver; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - - -@Category(NeedsOwnMiniClusterTest.class) -public class IndexQosIT extends BaseTest { - - private static final String SCHEMA_NAME = "S"; - private static final String INDEX_TABLE_NAME = "I"; - private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T"); - private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I"); - private static final int NUM_SLAVES = 2; - - private static String url; - private static PhoenixTestDriver driver; - private HBaseTestingUtility util; - private HBaseAdmin admin; - private Configuration conf; - private static RpcExecutor spyRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-queue", 30, 1, 300)); - - /** - * Factory that uses a spyed RpcExecutor - */ - public static class TestPhoenixIndexRpcSchedulerFactory extends PhoenixIndexRpcSchedulerFactory { - @Override - public RpcScheduler create(Configuration conf, RegionServerServices services) { - PhoenixIndexRpcScheduler phoenixIndexRpcScheduler = (PhoenixIndexRpcScheduler)super.create(conf, services); - phoenixIndexRpcScheduler.setExecutorForTesting(spyRpcExecutor); - return phoenixIndexRpcScheduler; - } - } - - @Before - public void doSetup() throws Exception { - conf = HBaseConfiguration.create(); - setUpConfigForMiniCluster(conf); - conf.set(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - TestPhoenixIndexRpcSchedulerFactory.class.getName()); - conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, IndexQosRpcControllerFactory.class.getName()); - util = new HBaseTestingUtility(conf); - // start cluster with 2 region servers - util.startMiniCluster(NUM_SLAVES); - admin = util.getHBaseAdmin(); - String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); - url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort - + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; - driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS); - } - - @After - public void tearDown() throws Exception { - try { - destroyDriver(driver); - if (admin!=null) { - admin.close(); - } - } finally { - util.shutdownMiniCluster(); - } - } - - @Test - public void testIndexWriteQos() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = driver.connect(url, props); - - // create the table - conn.createStatement().execute( - "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - - // create the index - conn.createStatement().execute( - "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); - - byte[] dataTableName = Bytes.toBytes(DATA_TABLE_FULL_NAME); - byte[] indexTableName = Bytes.toBytes(INDEX_TABLE_FULL_NAME); - MiniHBaseCluster cluster = util.getHBaseCluster(); - HMaster master = cluster.getMaster(); - AssignmentManager am = master.getAssignmentManager(); - - // verify there is only a single region for data table - List<HRegionInfo> tableRegions = admin.getTableRegions(dataTableName); - assertEquals("Expected single region for " + dataTableName, tableRegions.size(), 1); - HRegionInfo dataHri = tableRegions.get(0); - - // verify there is only a single region for index table - tableRegions = admin.getTableRegions(indexTableName); - HRegionInfo indexHri = tableRegions.get(0); - assertEquals("Expected single region for " + indexTableName, tableRegions.size(), 1); - - ServerName dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri); - ServerName indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri); - - // if data table and index table are on same region server, move the index table to the other region server - if (dataServerName.equals(indexServerName)) { - HRegionServer server1 = util.getHBaseCluster().getRegionServer(0); - HRegionServer server2 = util.getHBaseCluster().getRegionServer(1); - HRegionServer dstServer = null; - HRegionServer srcServer = null; - if (server1.getServerName().equals(indexServerName)) { - dstServer = server2; - srcServer = server1; - } else { - dstServer = server1; - srcServer = server2; - } - byte[] encodedRegionNameInBytes = indexHri.getEncodedNameAsBytes(); - admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName())); - while (dstServer.getOnlineRegion(indexHri.getRegionName()) == null - || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) - || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) - || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { - // wait for the move to be finished - Thread.sleep(1); - } - } - - dataHri = admin.getTableRegions(dataTableName).get(0); - dataServerName = am.getRegionStates().getRegionServerOfRegion(dataHri); - indexHri = admin.getTableRegions(indexTableName).get(0); - indexServerName = am.getRegionStates().getRegionServerOfRegion(indexHri); - - // verify index and data tables are on different servers - assertNotEquals("Index and Data table should be on different region servers dataServer " + dataServerName - + " indexServer " + indexServerName, dataServerName, indexServerName); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "k1"); - stmt.setString(2, "v1"); - stmt.setString(3, "v2"); - stmt.execute(); - conn.commit(); - - // run select query that should use the index - String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?"; - stmt = conn.prepareStatement(selectSql); - stmt.setString(1, "v1"); - - // verify that the query does a range scan on the index table - ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql); - assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs)); - - // verify that the correct results are returned - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("k1", rs.getString(1)); - assertEquals("v2", rs.getString(2)); - assertFalse(rs.next()); - - // drop index table - conn.createStatement().execute( - "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME ); - // create a data table with the same name as the index table - conn.createStatement().execute( - "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - - // upsert one row to the table (which has the same table name as the previous index table) - stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "k1"); - stmt.setString(2, "v1"); - stmt.setString(3, "v2"); - stmt.execute(); - conn.commit(); - - // run select query on the new table - selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?"; - stmt = conn.prepareStatement(selectSql); - stmt.setString(1, "v1"); - - // verify that the correct results are returned - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("k1", rs.getString(1)); - assertEquals("v2", rs.getString(2)); - assertFalse(rs.next()); - - // verify that that index queue is used only once (for the first upsert) - Mockito.verify(spyRpcExecutor).dispatch(Mockito.any(CallRunner.class)); - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java new file mode 100644 index 0000000..eae890d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.rpc; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Maps; + +public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT { + + private static final String SCHEMA_NAME = "S"; + private static final String INDEX_TABLE_NAME = "I"; + private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T"); + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + TestPhoenixIndexRpcSchedulerFactory.class.getName()); + serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName()); + NUM_SLAVES_BASE = 2; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet() + .iterator())); + } + + @AfterClass + public static void doTeardown() throws Exception { + TestPhoenixIndexRpcSchedulerFactory.reset(); + } + + @Test + public void testIndexQos() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(getUrl(), props); + try { + // create the table + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true"); + + // create the index + conn.createStatement().execute( + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "k1"); + stmt.setString(2, "v1"); + stmt.setString(3, "v2"); + stmt.execute(); + conn.commit(); + + // run select query that should use the index + String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?"; + stmt = conn.prepareStatement(selectSql); + stmt.setString(1, "v1"); + + // verify that the query does a range scan on the index table + ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs)); + + // verify that the correct results are returned + rs = stmt.executeQuery(); + assertTrue(rs.next()); + assertEquals("k1", rs.getString(1)); + assertEquals("v2", rs.getString(2)); + assertFalse(rs.next()); + + // verify that index queue is not used (since the index writes originate from a client an not a region server) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class)); + } finally { + conn.close(); + } + } + + @Test + public void testMetadataQos() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(getUrl(), props); + try { + // create the table + conn.createStatement().execute("CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)"); + // verify that that metadata queue is used at least once + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor(), Mockito.atLeastOnce()).dispatch(Mockito.any(CallRunner.class)); + } finally { + conn.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java new file mode 100644 index 0000000..796a439 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.rpc; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Maps; + +public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT { + + private static final String SCHEMA_NAME = "S"; + private static final String INDEX_TABLE_NAME = "I"; + private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T"); + private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I"); + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); + serverProps.put(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + TestPhoenixIndexRpcSchedulerFactory.class.getName()); + serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, RpcControllerFactory.class.getName()); + NUM_SLAVES_BASE = 2; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + @AfterClass + public static void doTeardown() throws Exception { + TestPhoenixIndexRpcSchedulerFactory.reset(); + } + + @Test + public void testIndexQos() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(getUrl(), props); + try { + // create the table + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + + // create the index + conn.createStatement().execute( + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + + ensureTablesOnDifferentRegionServers(DATA_TABLE_FULL_NAME, INDEX_TABLE_FULL_NAME); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "k1"); + stmt.setString(2, "v1"); + stmt.setString(3, "v2"); + stmt.execute(); + conn.commit(); + + // run select query that should use the index + String selectSql = "SELECT k, v2 from " + DATA_TABLE_FULL_NAME + " WHERE v1=?"; + stmt = conn.prepareStatement(selectSql); + stmt.setString(1, "v1"); + + // verify that the query does a range scan on the index table + ResultSet rs = stmt.executeQuery("EXPLAIN " + selectSql); + assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER S.I ['v1']", QueryUtil.getExplainPlan(rs)); + + // verify that the correct results are returned + rs = stmt.executeQuery(); + assertTrue(rs.next()); + assertEquals("k1", rs.getString(1)); + assertEquals("v2", rs.getString(2)); + assertFalse(rs.next()); + + // drop index table + conn.createStatement().execute( + "DROP INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME ); + // create a data table with the same name as the index table + conn.createStatement().execute( + "CREATE TABLE " + INDEX_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + + // upsert one row to the table (which has the same table name as the previous index table) + stmt = conn.prepareStatement("UPSERT INTO " + INDEX_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "k1"); + stmt.setString(2, "v1"); + stmt.setString(3, "v2"); + stmt.execute(); + conn.commit(); + + // run select query on the new table + selectSql = "SELECT k, v2 from " + INDEX_TABLE_FULL_NAME + " WHERE v1=?"; + stmt = conn.prepareStatement(selectSql); + stmt.setString(1, "v1"); + + // verify that the correct results are returned + rs = stmt.executeQuery(); + assertTrue(rs.next()); + assertEquals("k1", rs.getString(1)); + assertEquals("v2", rs.getString(2)); + assertFalse(rs.next()); + + // verify that that index queue is used only once (for the first upsert) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); + } + finally { + conn.close(); + } + } + + /** + * Verifies that the given tables each have a single region and are on + * different region servers. If they are on the same server moves tableName2 + * to the other region server. + */ + private void ensureTablesOnDifferentRegionServers(String tableName1, String tableName2) throws Exception { + byte[] table1 = Bytes.toBytes(tableName1); + byte[] table2 = Bytes.toBytes(tableName2); + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TEST_PROPERTIES).getAdmin(); + HBaseTestingUtility util = getUtility(); + MiniHBaseCluster cluster = util.getHBaseCluster(); + HMaster master = cluster.getMaster(); + AssignmentManager am = master.getAssignmentManager(); + + // verify there is only a single region for data table + List<HRegionInfo> tableRegions = admin.getTableRegions(table1); + assertEquals("Expected single region for " + table1, tableRegions.size(), 1); + HRegionInfo hri1 = tableRegions.get(0); + + // verify there is only a single region for index table + tableRegions = admin.getTableRegions(table2); + HRegionInfo hri2 = tableRegions.get(0); + assertEquals("Expected single region for " + table2, tableRegions.size(), 1); + + ServerName serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1); + ServerName serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2); + + // if data table and index table are on same region server, move the index table to the other region server + if (serverName1.equals(serverName2)) { + HRegionServer server1 = util.getHBaseCluster().getRegionServer(0); + HRegionServer server2 = util.getHBaseCluster().getRegionServer(1); + HRegionServer dstServer = null; + HRegionServer srcServer = null; + if (server1.getServerName().equals(serverName2)) { + dstServer = server2; + srcServer = server1; + } else { + dstServer = server1; + srcServer = server2; + } + byte[] encodedRegionNameInBytes = hri2.getEncodedNameAsBytes(); + admin.move(encodedRegionNameInBytes, Bytes.toBytes(dstServer.getServerName().getServerName())); + while (dstServer.getOnlineRegion(hri2.getRegionName()) == null + || dstServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) + || srcServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameInBytes) + || master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(1); + } + } + + hri1 = admin.getTableRegions(table1).get(0); + serverName1 = am.getRegionStates().getRegionServerOfRegion(hri1); + hri2 = admin.getTableRegions(table2).get(0); + serverName2 = am.getRegionStates().getRegionServerOfRegion(hri2); + + // verify index and data tables are on different servers + assertNotEquals("Tables " + tableName1 + " and " + tableName2 + " should be on different region servers", serverName1, serverName2); + } + + @Test + public void testMetadataQos() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = driver.connect(getUrl(), props); + try { + ensureTablesOnDifferentRegionServers(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); + // create the table + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)"); + // query the table from another connection, so that SYSTEM.STATS will be used + conn.createStatement().execute("SELECT * FROM "+DATA_TABLE_FULL_NAME); + // verify that that metadata queue is used once + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); + } + finally { + conn.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java new file mode 100644 index 0000000..b27f4c1 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/TestPhoenixIndexRpcSchedulerFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.rpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor; +import org.apache.hadoop.hbase.ipc.PhoenixRpcScheduler; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.ipc.RpcExecutor; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.mockito.Mockito; + +public class TestPhoenixIndexRpcSchedulerFactory extends PhoenixRpcSchedulerFactory { + + private static RpcExecutor indexRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-index-queue", 30, 1, + 300)); + private static RpcExecutor metadataRpcExecutor = Mockito.spy(new BalancedQueueRpcExecutor("test-metataqueue", 30, + 1, 300)); + + @Override + public RpcScheduler create(Configuration conf, RegionServerServices services) { + PhoenixRpcScheduler phoenixIndexRpcScheduler = (PhoenixRpcScheduler)super.create(conf, services); + phoenixIndexRpcScheduler.setIndexExecutorForTesting(indexRpcExecutor); + phoenixIndexRpcScheduler.setMetadataExecutorForTesting(metadataRpcExecutor); + return phoenixIndexRpcScheduler; + } + + public static RpcExecutor getIndexRpcExecutor() { + return indexRpcExecutor; + } + + public static RpcExecutor getMetadataRpcExecutor() { + return metadataRpcExecutor; + } + + public static void reset() { + Mockito.reset(metadataRpcExecutor); + Mockito.reset(indexRpcExecutor); + } +} + + http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java deleted file mode 100644 index 4709304..0000000 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcScheduler.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.annotations.VisibleForTesting; - -/** - * {@link RpcScheduler} that first checks to see if this is an index update before passing off the - * call to the delegate {@link RpcScheduler}. - * <p> - * We reserve the range (1000, 1050], by default (though it is configurable), for index priority - * writes. Currently, we don't do any prioritization within that range - all index writes are - * treated with the same priority and put into the same queue. - */ -public class PhoenixIndexRpcScheduler extends RpcScheduler { - - // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 - public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "ipc.server.callqueue.read.share"; - public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = - "ipc.server.callqueue.handler.factor"; - private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; - - private RpcScheduler delegate; - private int minPriority; - private int maxPriority; - private RpcExecutor callExecutor; - private int port; - - public PhoenixIndexRpcScheduler(int indexHandlerCount, Configuration conf, - RpcScheduler delegate, int minPriority, int maxPriority) { - int maxQueueLength = - conf.getInt("ipc.server.max.callqueue.length", indexHandlerCount - * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - - // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 - float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); - int numCallQueues = - Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor)); - - this.minPriority = minPriority; - this.maxPriority = maxPriority; - this.delegate = delegate; - - this.callExecutor = - new BalancedQueueRpcExecutor("Index", indexHandlerCount, numCallQueues, - maxQueueLength); - } - - @Override - public void init(Context context) { - delegate.init(context); - this.port = context.getListenerAddress().getPort(); - } - - @Override - public void start() { - delegate.start(); - callExecutor.start(port); - } - - @Override - public void stop() { - delegate.stop(); - callExecutor.stop(); - } - - @Override - public void dispatch(CallRunner callTask) throws InterruptedException, IOException { - RpcServer.Call call = callTask.getCall(); - int priority = call.header.getPriority(); - if (minPriority <= priority && priority < maxPriority) { - callExecutor.dispatch(callTask); - } else { - delegate.dispatch(callTask); - } - } - - @Override - public int getGeneralQueueLength() { - // not the best way to calculate, but don't have a better way to hook - // into metrics at the moment - return this.delegate.getGeneralQueueLength() + this.callExecutor.getQueueLength(); - } - - @Override - public int getPriorityQueueLength() { - return this.delegate.getPriorityQueueLength(); - } - - @Override - public int getReplicationQueueLength() { - return this.delegate.getReplicationQueueLength(); - } - - @Override - public int getActiveRpcHandlerCount() { - return this.delegate.getActiveRpcHandlerCount() + this.callExecutor.getActiveHandlerCount(); - } - - @VisibleForTesting - public void setExecutorForTesting(RpcExecutor executor) { - this.callExecutor = executor; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java new file mode 100644 index 0000000..e721271 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link RpcScheduler} that first checks to see if this is an index or metedata update before passing off the + * call to the delegate {@link RpcScheduler}. + */ +public class PhoenixRpcScheduler extends RpcScheduler { + + // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 + private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor"; + private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length"; + private static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; + + private RpcScheduler delegate; + private int indexPriority; + private int metadataPriority; + private RpcExecutor indexCallExecutor; + private RpcExecutor metadataCallExecutor; + private int port; + + public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) { + // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 + int maxQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); + int numQueues = Math.max(1, Math.round(callQueuesHandlersFactor)); + + this.indexPriority = indexPriority; + this.metadataPriority = metadataPriority; + this.delegate = delegate; + this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", 1, numQueues, maxQueueLength); + this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", 1, numQueues, maxQueueLength); + } + + @Override + public void init(Context context) { + delegate.init(context); + this.port = context.getListenerAddress().getPort(); + } + + @Override + public void start() { + delegate.start(); + indexCallExecutor.start(port); + metadataCallExecutor.start(port); + } + + @Override + public void stop() { + delegate.stop(); + indexCallExecutor.stop(); + metadataCallExecutor.stop(); + } + + @Override + public void dispatch(CallRunner callTask) throws InterruptedException, IOException { + RpcServer.Call call = callTask.getCall(); + int priority = call.header.getPriority(); + if (indexPriority == priority) { + indexCallExecutor.dispatch(callTask); + } else if (metadataPriority == priority) { + metadataCallExecutor.dispatch(callTask); + } else { + delegate.dispatch(callTask); + } + } + + @Override + public int getGeneralQueueLength() { + // not the best way to calculate, but don't have a better way to hook + // into metrics at the moment + return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength(); + } + + @Override + public int getPriorityQueueLength() { + return this.delegate.getPriorityQueueLength(); + } + + @Override + public int getReplicationQueueLength() { + return this.delegate.getReplicationQueueLength(); + } + + @Override + public int getActiveRpcHandlerCount() { + return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount(); + } + + @VisibleForTesting + public void setIndexExecutorForTesting(RpcExecutor executor) { + this.indexCallExecutor = executor; + } + + @VisibleForTesting + public void setMetadataExecutorForTesting(RpcExecutor executor) { + this.metadataCallExecutor = executor; + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java new file mode 100644 index 0000000..e666c52 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + +import com.google.common.base.Preconditions; + +/** + * Factory to create a {@link PhoenixRpcScheduler}. In this package so we can access the + * {@link SimpleRpcSchedulerFactory}. + */ +public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory { + + private static final Log LOG = LogFactory.getLog(PhoenixRpcSchedulerFactory.class); + + private static final String VERSION_TOO_OLD_FOR_INDEX_RPC = + "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled."; + + @Override + public RpcScheduler create(Configuration conf, RegionServerServices services) { + // create the delegate scheduler + RpcScheduler delegate; + try { + // happens in <=0.98.4 where the scheduler factory is not visible + delegate = new SimpleRpcSchedulerFactory().create(conf, services); + } catch (IllegalAccessError e) { + LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC); + throw e; + } + try { + // make sure we are on a version that phoenix can support + Class.forName("org.apache.hadoop.hbase.ipc.RpcExecutor"); + } catch (ClassNotFoundException e) { + LOG.error(VERSION_TOO_OLD_FOR_INDEX_RPC + + " Instead, using falling back to Simple RPC scheduling."); + return delegate; + } + + // get the index priority configs + int indexPriority = getIndexPriority(conf); + validatePriority(indexPriority); + // get the metadata priority configs + int metadataPriority = getMetadataPriority(conf); + validatePriority(metadataPriority); + + // validate index and metadata priorities are not the same + Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority); + LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority); + + PhoenixRpcScheduler scheduler = + new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority); + return scheduler; + } + + /** + * Validates that the given priority does not overlap with the HBase priority range + */ + private void validatePriority(int priority) { + Preconditions.checkArgument( priority < HConstants.NORMAL_QOS || priority > HConstants.HIGH_QOS, "priority cannot be within hbase priority range " + + HConstants.NORMAL_QOS +" to " + HConstants.HIGH_QOS ); + } + + public static int getIndexPriority(Configuration conf) { + return conf.getInt(QueryServices.INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_PRIORITY); + } + + public static int getMetadataPriority(Configuration conf) { + return conf.getInt(QueryServices.METADATA_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_PRIORITY); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java new file mode 100644 index 0000000..5a7dcc2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ClientRpcControllerFactory.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc.controller; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * {@link RpcControllerFactory} that sets the priority of metadata rpc calls to be processed + * in its own queue. + */ +public class ClientRpcControllerFactory extends RpcControllerFactory { + + public ClientRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public PayloadCarryingRpcController newController() { + PayloadCarryingRpcController delegate = super.newController(); + return getController(delegate); + } + + @Override + public PayloadCarryingRpcController newController(CellScanner cellScanner) { + PayloadCarryingRpcController delegate = super.newController(cellScanner); + return getController(delegate); + } + + @Override + public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { + PayloadCarryingRpcController delegate = super.newController(cellIterables); + return getController(delegate); + } + + private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) { + return new MetadataRpcController(delegate, conf); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java new file mode 100644 index 0000000..fdb1d33 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/IndexRpcController.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc.controller; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + +class IndexRpcController extends DelegatingPayloadCarryingRpcController { + + private final int priority; + private final String tracingTableName; + + public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) { + super(delegate); + this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf); + this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB, + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); + } + + @Override + public void setPriority(final TableName tn) { + if (!tn.isSystemTable() && !tn.getNameAsString().equals(tracingTableName)) { + setPriority(this.priority); + } + else { + super.setPriority(tn); + } + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java new file mode 100644 index 0000000..23b9f03 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/MetadataRpcController.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc.controller; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; + +import com.google.common.collect.ImmutableList; + +class MetadataRpcController extends DelegatingPayloadCarryingRpcController { + + private int priority; + // list of system tables + private static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>() + .add(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME) + .add(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) + .add(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME).build(); + + public MetadataRpcController(PayloadCarryingRpcController delegate, + Configuration conf) { + super(delegate); + this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf); + } + + @Override + public void setPriority(final TableName tn) { + if (SYSTEM_TABLE_NAMES.contains(tn.getNameAsString())) { + setPriority(this.priority); + } else { + super.setPriority(tn); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java new file mode 100644 index 0000000..8c17eda --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerRpcControllerFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc.controller; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * {@link RpcControllerFactory} that sets the priority of index and metadata rpc calls + * so that they are each processed in their own queues + */ +public class ServerRpcControllerFactory extends RpcControllerFactory { + + public ServerRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public PayloadCarryingRpcController newController() { + PayloadCarryingRpcController delegate = super.newController(); + return getController(delegate); + } + + @Override + public PayloadCarryingRpcController newController(CellScanner cellScanner) { + PayloadCarryingRpcController delegate = super.newController(cellScanner); + return getController(delegate); + } + + @Override + public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { + PayloadCarryingRpcController delegate = super.newController(cellIterables); + return getController(delegate); + } + + private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) { + // construct a chain of controllers: metadata, index and standard controller + IndexRpcController indexRpcController = new IndexRpcController(delegate, conf); + return new MetadataRpcController(indexRpcController, conf); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java deleted file mode 100644 index a192feb..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexQosRpcControllerFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; -import org.apache.phoenix.util.SchemaUtil; - -/** - * {@link RpcControllerFactory} that overrides the standard {@link PayloadCarryingRpcController} to - * allow the configured index tables (via {@link #INDEX_TABLE_NAMES_KEY}) to use the Index priority. - */ -public class IndexQosRpcControllerFactory extends RpcControllerFactory { - - public static final String INDEX_TABLE_NAMES_KEY = "phoenix.index.rpc.controller.index-tables"; - - public IndexQosRpcControllerFactory(Configuration conf) { - super(conf); - } - - @Override - public PayloadCarryingRpcController newController() { - PayloadCarryingRpcController delegate = super.newController(); - return new IndexQosRpcController(delegate, conf); - } - - @Override - public PayloadCarryingRpcController newController(CellScanner cellScanner) { - PayloadCarryingRpcController delegate = super.newController(cellScanner); - return new IndexQosRpcController(delegate, conf); - } - - @Override - public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { - PayloadCarryingRpcController delegate = super.newController(cellIterables); - return new IndexQosRpcController(delegate, conf); - } - - private class IndexQosRpcController extends DelegatingPayloadCarryingRpcController { - - private int priority; - - public IndexQosRpcController(PayloadCarryingRpcController delegate, Configuration conf) { - super(delegate); - this.priority = PhoenixIndexRpcSchedulerFactory.getMinPriority(conf); - } - @Override - public void setPriority(final TableName tn) { - // if its an index table, then we override to the index priority - if (!tn.isSystemTable() && !SchemaUtil.isSystemDataTable(tn.getNameAsString())) { - setPriority(this.priority); - } - else { - super.setPriority(tn); - } - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java deleted file mode 100644 index 8e0b86f..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ipc/PhoenixIndexRpcSchedulerFactory.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.ipc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ipc.PhoenixIndexRpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; -import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; - -import com.google.common.base.Preconditions; - -/** - * Factory to create a {@link PhoenixIndexRpcScheduler}. In this package so we can access the - * {@link SimpleRpcSchedulerFactory}. - */ -public class PhoenixIndexRpcSchedulerFactory implements RpcSchedulerFactory { - - private static final Log LOG = LogFactory.getLog(PhoenixIndexRpcSchedulerFactory.class); - - private static final String VERSION_TOO_OLD_FOR_INDEX_RPC = - "Running an older version of HBase (less than 0.98.4), Phoenix index RPC handling cannot be enabled."; - - @Override - public RpcScheduler create(Configuration conf, RegionServerServices services) { - // create the delegate scheduler - RpcScheduler delegate; - try { - // happens in <=0.98.4 where the scheduler factory is not visible - delegate = new SimpleRpcSchedulerFactory().create(conf, services); - } catch (IllegalAccessError e) { - LOG.fatal(VERSION_TOO_OLD_FOR_INDEX_RPC); - throw e; - } - try { - // make sure we are on a version that phoenix can support - Class.forName("org.apache.hadoop.hbase.ipc.RpcExecutor"); - } catch (ClassNotFoundException e) { - LOG.error(VERSION_TOO_OLD_FOR_INDEX_RPC - + " Instead, using falling back to Simple RPC scheduling."); - return delegate; - } - - int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); - int minPriority = getMinPriority(conf); - int maxPriority = conf.getInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MAX_PRIORITY); - // make sure the ranges are outside the warning ranges - Preconditions.checkArgument(maxPriority > minPriority, "Max index priority (" + maxPriority - + ") must be larger than min priority (" + minPriority + ")"); - boolean allSmaller = - minPriority < HConstants.REPLICATION_QOS - && maxPriority < HConstants.REPLICATION_QOS; - boolean allLarger = minPriority > HConstants.HIGH_QOS; - Preconditions.checkArgument(allSmaller || allLarger, "Index priority range (" + minPriority - + ", " + maxPriority + ") must be outside HBase priority range (" - + HConstants.REPLICATION_QOS + ", " + HConstants.HIGH_QOS + ")"); - - LOG.info("Using custom Phoenix Index RPC Handling with " + indexHandlerCount - + " handlers and priority range [" + minPriority + ", " + maxPriority + ")"); - - PhoenixIndexRpcScheduler scheduler = - new PhoenixIndexRpcScheduler(indexHandlerCount, conf, delegate, minPriority, - maxPriority); - return scheduler; - } - - public static int getMinPriority(Configuration conf) { - return conf.getInt(QueryServices.MIN_INDEX_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_MIN_PRIORITY); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 15bcfd0..1b8b57d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -279,10 +279,6 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho /** Version below which we fall back on the generic KeyValueBuilder */ public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); - // list of system tables - public static final List<String> SYSTEM_TABLE_NAMES = new ImmutableList.Builder<String>().add(SYSTEM_CATALOG_NAME) - .add(SYSTEM_STATS_NAME).add(SEQUENCE_FULLNAME).build(); - PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection)); this.connection = connection; http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 2eab5dd..65f6acf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -123,9 +123,8 @@ public interface QueryServices extends SQLCloseable { // Index will be partially re-built from index disable time stamp - following overlap time public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = "phoenix.index.failure.handling.rebuild.overlap.time"; - public static final String MIN_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.min"; - public static final String MAX_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.max"; - public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.regionserver.index.handler.count"; + public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority"; + public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority"; public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex"; // Config parameters for for configuring tracing http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 8cd740a..97040d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -41,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; +import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED; import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; @@ -61,12 +62,13 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; -import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.trace.util.Tracing; @@ -138,13 +140,12 @@ public class QueryServicesOptions { public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins - public static final int DEFAULT_INDEX_MAX_PRIORITY = 1050; /** * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate * and give some room for things in the middle */ - public static final int DEFAULT_INDEX_MIN_PRIORITY = 1000; - public static final int DEFAULT_INDEX_HANDLER_COUNT = 30; + public static final int DEFAULT_INDEX_PRIORITY = 1000; + public static final int DEFAULT_METADATA_PRIORITY = 2000; public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true; public static final int DEFAULT_TRACING_PAGE_SIZE = 100; @@ -235,7 +236,8 @@ public class QueryServicesOptions { .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE) .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK) .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK) - .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED); + .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED) + .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName()); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 3bd5057..022033d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -404,13 +404,6 @@ public class SchemaUtil { return false; } - /** - * Returns true if the given table is a system table (does not include future system indexes) - */ - public static boolean isSystemDataTable(String fullTableName) { - return PhoenixDatabaseMetaData.SYSTEM_TABLE_NAMES.contains(fullTableName); - } - // Given the splits and the rowKeySchema, find out the keys that public static byte[][] processSplits(byte[][] splits, LinkedHashSet<PColumn> pkColumns, Integer saltBucketNum, boolean defaultRowKeyOrder) throws SQLException { // FIXME: shouldn't this return if splits.length == 0? http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java index ec18d9b..8f26deb 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java @@ -42,9 +42,9 @@ public class PhoenixIndexRpcSchedulerTest { public void testIndexPriorityWritesToIndexHandler() throws Exception { RpcScheduler mock = Mockito.mock(RpcScheduler.class); - PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250); + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250); BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1, 1); - scheduler.setExecutorForTesting(executor); + scheduler.setIndexExecutorForTesting(executor); dispatchCallWithPriority(scheduler, 200); List<BlockingQueue<CallRunner>> queues = executor.getQueues(); assertEquals(1, queues.size()); @@ -52,8 +52,8 @@ public class PhoenixIndexRpcSchedulerTest { queue.poll(20, TimeUnit.SECONDS); // try again, this time we tweak the ranges we support - scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110); - scheduler.setExecutorForTesting(executor); + scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110); + scheduler.setIndexExecutorForTesting(executor); dispatchCallWithPriority(scheduler, 101); queue.poll(20, TimeUnit.SECONDS); @@ -69,14 +69,14 @@ public class PhoenixIndexRpcSchedulerTest { @Test public void testDelegateWhenOutsideRange() throws Exception { RpcScheduler mock = Mockito.mock(RpcScheduler.class); - PhoenixIndexRpcScheduler scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 200, 250); + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250); dispatchCallWithPriority(scheduler, 100); - dispatchCallWithPriority(scheduler, 250); + dispatchCallWithPriority(scheduler, 251); // try again, this time we tweak the ranges we support - scheduler = new PhoenixIndexRpcScheduler(10, conf, mock, 101, 110); + scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110); dispatchCallWithPriority(scheduler, 200); - dispatchCallWithPriority(scheduler, 110); + dispatchCallWithPriority(scheduler, 111); Mockito.verify(mock, Mockito.times(4)).init(Mockito.any(Context.class)); Mockito.verify(mock, Mockito.times(4)).dispatch(Mockito.any(CallRunner.class)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0b49b28f/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java deleted file mode 100644 index 4918bba..0000000 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/PhoenixIndexRpcSchedulerFactoryTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.conf.Configuration; -import org.apache.phoenix.hbase.index.ipc.PhoenixIndexRpcSchedulerFactory; -import org.apache.phoenix.query.QueryServices; -import org.junit.Test; - -public class PhoenixIndexRpcSchedulerFactoryTest { - - @Test - public void ensureInstantiation() throws Exception { - Configuration conf = new Configuration(false); - conf.setClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - PhoenixIndexRpcSchedulerFactory.class, RpcSchedulerFactory.class); - // kinda lame that we copy the copy from the regionserver to do this and can't use a static - // method, but meh - try { - Class<?> rpcSchedulerFactoryClass = - conf.getClass(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - SimpleRpcSchedulerFactory.class); - Object o = rpcSchedulerFactoryClass.newInstance(); - assertTrue(o instanceof PhoenixIndexRpcSchedulerFactory); - } catch (InstantiationException e) { - assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e, - false); - } catch (IllegalAccessException e) { - assertTrue("Should not have got an exception when instantiing the rpc scheduler: " + e, - false); - } - } - - /** - * Ensure that we can't configure the index priority ranges inside the hbase ranges - * @throws Exception - */ - @Test - public void testValidateIndexPriorityRanges() throws Exception { - Configuration conf = new Configuration(false); - // standard configs should be fine - PhoenixIndexRpcSchedulerFactory factory = new PhoenixIndexRpcSchedulerFactory(); - factory.create(conf, null); - - setMinMax(conf, 0, 4); - factory.create(conf, null); - - setMinMax(conf, 101, 102); - factory.create(conf, null); - - setMinMax(conf, 102, 101); - try { - factory.create(conf, null); - fail("Should not have allowed max less than min"); - } catch (IllegalArgumentException e) { - // expected - } - - setMinMax(conf, 5, 6); - try { - factory.create(conf, null); - fail("Should not have allowed min in range"); - } catch (IllegalArgumentException e) { - // expected - } - - setMinMax(conf, 6, 60); - try { - factory.create(conf, null); - fail("Should not have allowed min/max in hbase range"); - } catch (IllegalArgumentException e) { - // expected - } - - setMinMax(conf, 6, 101); - try { - factory.create(conf, null); - fail("Should not have allowed in range"); - } catch (IllegalArgumentException e) { - // expected - } - } - - private void setMinMax(Configuration conf, int min, int max) { - conf.setInt(QueryServices.MIN_INDEX_PRIOIRTY_ATTRIB, min); - conf.setInt(QueryServices.MAX_INDEX_PRIOIRTY_ATTRIB, max); - } -} \ No newline at end of file