PHOENIX-2519 Prevent RPC for SYSTEM tables when querying
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/72f80bd9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/72f80bd9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/72f80bd9 Branch: refs/heads/4.x-HBase-1.0 Commit: 72f80bd9d1452d0fca72aa5a3fcbe3996df8ca52 Parents: 8726efd Author: James Taylor <[email protected]> Authored: Sun Dec 13 14:31:00 2015 -0800 Committer: James Taylor <[email protected]> Committed: Mon Dec 14 11:52:00 2015 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/rpc/UpdateCacheIT.java | 46 +++++++++++++++----- .../phoenix/rpc/UpdateCacheWithScnIT.java | 2 +- .../apache/phoenix/execute/BaseQueryPlan.java | 12 ++++- 3 files changed, 47 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/72f80bd9/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java index 3b3c6c7..208af94 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -78,26 +79,49 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT { ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE); } + private static void setupSystemTable(Long scn) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + if (scn != null) { + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + } + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute( + "create table " + QueryConstants.SYSTEM_SCHEMA_NAME + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE + TEST_TABLE_SCHEMA); + } + } + @Test public void testUpdateCacheForTxnTable() throws Exception { - helpTestUpdateCache(true, null); + helpTestUpdateCache(true, false, null); } @Test public void testUpdateCacheForNonTxnTable() throws Exception { - helpTestUpdateCache(false, null); + helpTestUpdateCache(false, false, null); } - public static void helpTestUpdateCache(boolean isTransactional, Long scn) throws Exception { + @Test + public void testUpdateCacheForNonTxnSystemTable() throws Exception { + helpTestUpdateCache(false, true, null); + } + + public static void helpTestUpdateCache(boolean isTransactional, boolean isSystem, Long scn) throws Exception { String tableName = isTransactional ? TRANSACTIONAL_DATA_TABLE : MUTABLE_INDEX_DATA_TABLE; - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName; + String schemaName; + if (isSystem) { + setupSystemTable(scn); + schemaName = QueryConstants.SYSTEM_SCHEMA_NAME; + } else { + schemaName = INDEX_DATA_SCHEMA; + } + String fullTableName = schemaName + QueryConstants.NAME_SEPARATOR + tableName; String selectSql = "SELECT * FROM "+fullTableName; // use a spyed ConnectionQueryServices so we can verify calls to getTable ConnectionQueryServices connectionQueryServices = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))); Properties props = new Properties(); props.putAll(PhoenixEmbeddedDriver.DEFFAULT_PROPS.asMap()); if (scn!=null) { - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+10)); } Connection conn = connectionQueryServices.connect(getUrl(), props); try { @@ -112,13 +136,14 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT { TestUtil.setRowKeyColumns(stmt, 3); stmt.execute(); conn.commit(); - // verify only one rpc to fetch table metadata, - verify(connectionQueryServices).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong()); + int numUpsertRpcs = isSystem ? 0 : 1; + // verify only 0 or 1 rpc to fetch table metadata, + verify(connectionQueryServices, times(numUpsertRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong()); reset(connectionQueryServices); if (scn!=null) { // advance scn so that we can see the data we just upserted - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+2)); + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+20)); conn = connectionQueryServices.connect(getUrl(), props); } @@ -143,8 +168,9 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT { // for non-transactional tables without a scn : verify one rpc to getTable occurs *per* query // for non-transactional tables with a scn : verify *only* one rpc occurs // for transactional tables : verify *only* one rpc occurs - int numRpcs = isTransactional || scn!=null ? 1 : 3; - verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong()); + // for non-transactional, system tables : verify non rpc occurs + int numRpcs = isSystem ? 0 : (isTransactional || scn!=null ? 1 : 3); + verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(schemaName)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong()); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/72f80bd9/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java index dbc7fd1..5ff2fb0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java @@ -35,7 +35,7 @@ public class UpdateCacheWithScnIT extends BaseClientManagedTimeIT { @Test public void testUpdateCacheWithScn() throws Exception { - UpdateCacheIT.helpTestUpdateCache(false, ts+2); + UpdateCacheIT.helpTestUpdateCache(false, false, ts+2); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/72f80bd9/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 91c2f19..7515642 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; @@ -57,6 +58,7 @@ import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -203,7 +205,8 @@ public abstract class BaseQueryPlan implements QueryPlan { // Set miscellaneous scan attributes. This is the last chance to set them before we // clone the scan for each parallelized chunk. Scan scan = context.getScan(); - PTable table = context.getCurrentTable().getTable(); + TableRef tableRef = context.getCurrentTable(); + PTable table = tableRef.getTable(); if (dynamicFilter != null) { WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null); @@ -232,7 +235,12 @@ public abstract class BaseQueryPlan implements QueryPlan { TimeRange scanTimeRange = scan.getTimeRange(); Long scn = connection.getSCN(); if (scn == null) { - scn = context.getCurrentTime(); + // If we haven't resolved the time at the beginning of compilation, don't + // force the lookup on the server, but use HConstants.LATEST_TIMESTAMP instead. + scn = tableRef.getTimeStamp(); + if (scn == QueryConstants.UNSET_TIMESTAMP) { + scn = HConstants.LATEST_TIMESTAMP; + } } try { TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);
