use htable to process mutations that aren't on the current region server
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7b4ccaf7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7b4ccaf7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7b4ccaf7 Branch: refs/heads/system-catalog Commit: 7b4ccaf7798956a75c0ff4df542ded5259172690 Parents: 39c0d1d Author: Thomas D'Silva <tdsi...@apache.org> Authored: Thu Dec 28 20:12:29 2017 -0800 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Thu Dec 28 20:12:29 2017 -0800 ---------------------------------------------------------------------- .../end2end/ExplainPlanWithStatsEnabledIT.java | 2 +- .../phoenix/end2end/TableDDLPermissionsIT.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 208 +++++++++++++------ .../apache/phoenix/coprocessor/ViewFinder.java | 9 +- .../coprocessor/MetaDataEndpointImplTest.java | 4 +- 5 files changed, 155 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7b4ccaf7/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java index bfc6819..bcf8cd8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java @@ -604,7 +604,7 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { assertTrue(info.getEstimateInfoTs() > 0); conn.createStatement() - .execute("ALTER TABLE " + viewName + " SET USE_STATS_FOR_PARALLELIZATION=true"); + .execute("ALTER VIEW " + viewName + " SET USE_STATS_FOR_PARALLELIZATION=true"); sql = "SELECT COUNT(*) FROM " + viewName; // query the view rs = conn.createStatement().executeQuery(sql); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7b4ccaf7/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java index 2e78cce..0130f35 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java @@ -167,7 +167,7 @@ public class TableDDLPermissionsIT{ @Parameters(name = "isNamespaceMapped={0}") // name is used by failsafe as file name in reports public static Collection<Boolean> data() { - return Arrays.asList(true/*, false*/); + return Arrays.asList(true, false); } @BeforeClass http://git-wip-us.apache.org/repos/asf/phoenix/blob/7b4ccaf7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index b9dbc20..cb08b2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -55,6 +55,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES; @@ -118,6 +119,7 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; @@ -2246,6 +2248,81 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } } + + public class ScannerAdaptor { + private boolean useHTable; + private RegionScanner regionScanner; + private ResultScanner resultScanner; + private List<Cell> results; + Result result; + + public ScannerAdaptor(Scan scan, Region region, boolean useHTable) throws IOException { + this.useHTable = useHTable; + this.results = Lists.newArrayList(); + if (useHTable) { + HTableInterface hTable = env.getTable(SchemaUtil + .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); + resultScanner = hTable.getScanner(scan); + } else { + regionScanner = region.getScanner(scan); + } + } + + public boolean next() throws IOException { + if (useHTable) { + result = resultScanner.next(); + if (result==null) { + return false; + } + } else { + results.clear(); + regionScanner.next(results); + if (results.isEmpty()) { + return false; + } + } + return true; + } + + public Delete getRowDelete(long clientTimestamp) { + if (useHTable) { + return new Delete(result.getRow(), clientTimestamp); + } else { + Cell cell = results.get(0); + Delete delete = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), clientTimestamp); + return delete; + } + } + + public Cell getLinkType() { + if (useHTable) { + return result.getColumnLatest(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES); + } else { + Cell kv = results.get(LINK_TYPE_INDEX); + if (Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), + LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0) { + return kv; + } + return null; + } + } + + public Cell getParentTenantId() { + if (useHTable) { + return result.getColumnLatest(TABLE_FAMILY_BYTES, PARENT_TENANT_ID_BYTES); + } else { + return MetaDataUtil.getCell(results, PARENT_TENANT_ID_BYTES); + } + } + + public void close() throws IOException { + if (useHTable) { + resultScanner.close(); + } else { + regionScanner.close(); + } + } + } private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> catalogMutations, @@ -2289,80 +2366,43 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // use the table to generate the Delete markers. Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); List<byte[]> indexNames = Lists.newArrayList(); - List<Cell> results = Lists.newArrayList(); - try (RegionScanner scanner = region.getScanner(scan);) { - scanner.next(results); - if (results.isEmpty()) { // Should not be possible + RegionScanner scanner = region.getScanner(scan); + boolean regionHasRow = region.getRegionInfo().containsRow(key); + ScannerAdaptor scannerAdaptor = new ScannerAdaptor(scan, region, !regionHasRow); + try { + if (!scannerAdaptor.next()) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); } - if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) { - // Handle any child views that exist - TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(); - findAllChildViews(tenantId, table.getSchemaName().getBytes(), table.getTableName().getBytes(), tableViewFinderResult); - if (tableViewFinderResult.hasViews()) { - if (isCascade) { - // Recursively delete views adding the mutations to delete child views to rowsToDelete - for (TableInfo tableInfo : tableViewFinderResult.getResults()) { - byte[] viewTenantId = tableInfo.getTenantId(); - byte[] viewSchemaName = tableInfo.getSchemaName(); - byte[] viewName = tableInfo.getTableName(); - byte[] viewKey = tableInfo.getRowKeyPrefix(); - Delete delete = new Delete(tableInfo.getRowKeyPrefix(), clientTimeStamp); - catalogMutations.add(delete); - MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName, - viewName, null, PTableType.VIEW, catalogMutations, childLinkMutations, invalidateList, - tableNamesToDelete, sharedTablesToDelete, false, clientVersion); - if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { - return result; - } - } - } else { - // DROP without CASCADE on tables with child views is not permitted - return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, - EnvironmentEdgeManager.currentTimeMillis(), null); - } - } - } - - // Add to list of HTables to delete, unless it's a view or its a shared index - if (tableType != PTableType.VIEW && table.getViewIndexId()==null) { - tableNamesToDelete.add(table.getPhysicalName().getBytes()); - } - else { - sharedTablesToDelete.add(new SharedTableState(table)); - } - invalidateList.add(cacheKey); - byte[][] rowKeyMetaData = new byte[5][]; + MetaDataMutationResult result = processChildViews(tenantId, catalogMutations, childLinkMutations, + invalidateList, tableNamesToDelete, sharedTablesToDelete, isCascade, clientVersion, clientTimeStamp, + cacheKey, table); + if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { + return result; + } do { - Cell kv = results.get(LINK_TYPE_INDEX); - int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData); - if (nColumns == 5 - && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0 - && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), - LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0) { - LinkType linkType = LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]); - if (rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && linkType == LinkType.INDEX_TABLE) { + Cell linkTypeCell = scannerAdaptor.getLinkType(); + if (linkTypeCell!=null) { + LinkType linkType = LinkType.fromSerializedValue(linkTypeCell.getValueArray()[linkTypeCell.getValueOffset()]); + if (linkType == LinkType.INDEX_TABLE) { + byte[][] rowKeyMetaData = new byte[5][]; + getVarChars(linkTypeCell.getRowArray(), linkTypeCell.getRowOffset(), linkTypeCell.getRowLength(), 0, rowKeyMetaData); indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); - } else if (linkType == LinkType.PARENT_TABLE || linkType == LinkType.PHYSICAL_TABLE) { + } else if (table.getType() == PTableType.VIEW && (linkType == LinkType.PARENT_TABLE || linkType == LinkType.PHYSICAL_TABLE)) { // delete parent->child link for views - Cell parentTenantIdCell = MetaDataUtil.getCell(results, PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES); + Cell parentTenantIdCell = scannerAdaptor.getParentTenantId(); PName parentTenantId = parentTenantIdCell!=null ? PNameFactory.newName(parentTenantIdCell.getValueArray(), parentTenantIdCell.getValueOffset(), parentTenantIdCell.getValueLength()) : null; byte[] linkKey = MetaDataUtil.getChildLinkKey(parentTenantId, table.getParentSchemaName(), table.getParentTableName(), table.getTenantId(), table.getName()); Delete linkDelete = new Delete(linkKey, clientTimeStamp); childLinkMutations.add(linkDelete); } } - // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870). - // FIXME: the version of the Delete constructor without the lock args was introduced - // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version - // of the client. - Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp); - catalogMutations.add(delete); - results.clear(); - scanner.next(results); - } while (!results.isEmpty()); + catalogMutations.add(scannerAdaptor.getRowDelete(clientTimeStamp)); + } while (scannerAdaptor.next()); + } + finally { + scanner.close(); } // Recursively delete indexes @@ -2386,6 +2426,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete); } + private MetaDataMutationResult processChildViews(byte[] tenantId, List<Mutation> catalogMutations, + List<Mutation> childLinkMutations, List<ImmutableBytesPtr> invalidateList, List<byte[]> tableNamesToDelete, + List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion, long clientTimeStamp, + ImmutableBytesPtr cacheKey, PTable table) throws IOException, SQLException { + PTableType tableType = table.getType(); + if (tableType == PTableType.VIEW || tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) { + // Handle any child views that exist + TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(); + findAllChildViews(tenantId, table.getSchemaName().getBytes(), table.getTableName().getBytes(), tableViewFinderResult); + if (tableViewFinderResult.hasViews()) { + if (isCascade) { + // Recursively delete views adding the mutations to delete child views to rowsToDelete + for (TableInfo tableInfo : tableViewFinderResult.getResults()) { + byte[] viewTenantId = tableInfo.getTenantId(); + byte[] viewSchemaName = tableInfo.getSchemaName(); + byte[] viewName = tableInfo.getTableName(); + byte[] viewKey = tableInfo.getRowKeyPrefix(); + Delete delete = new Delete(tableInfo.getRowKeyPrefix(), clientTimeStamp); + catalogMutations.add(delete); + MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName, + viewName, null, PTableType.VIEW, catalogMutations, childLinkMutations, invalidateList, + tableNamesToDelete, sharedTablesToDelete, isCascade, clientVersion); + if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { + return result; + } + } + } else { + // DROP without CASCADE on tables with child views is not permitted + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + } + } + + // Add to list of HTables to delete, unless it's a view or its a shared index + if (tableType != PTableType.VIEW && table.getViewIndexId()==null) { + tableNamesToDelete.add(table.getPhysicalName().getBytes()); + } + else { + sharedTablesToDelete.add(new SharedTableState(table)); + } + invalidateList.add(cacheKey); + return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + private static interface ColumnMutator { MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, http://git-wip-us.apache.org/repos/asf/phoenix/blob/7b4ccaf7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ViewFinder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ViewFinder.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ViewFinder.java index cde7a01..7389437 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ViewFinder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ViewFinder.java @@ -40,6 +40,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.LinkedHashMultimap; @@ -81,12 +82,8 @@ class ViewFinder { if (linkType==PTable.LinkType.INDEX_TABLE || linkType==PTable.LinkType.EXCLUDED_COLUMN) { throw new IllegalArgumentException("findAllRelatives does not support link type "+linkType); } - Scan scan = new Scan(); - byte[] startRow = ByteUtil.concat(SchemaUtil.getTableKey(tenantId, schema, table), SEPARATOR_BYTE_ARRAY); - byte[] stopRow = ByteUtil.nextKey(startRow); - scan.setStartRow(startRow); - scan.setStopRow(stopRow); - scan.setTimeRange(0, timestamp); + byte[] key = SchemaUtil.getTableKey(tenantId, schema, table); + Scan scan = MetaDataUtil.newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp); SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareFilter.CompareOp.EQUAL, linkType.getSerializedValueAsByteArray()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7b4ccaf7/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/MetaDataEndpointImplTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/MetaDataEndpointImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/MetaDataEndpointImplTest.java index fbde4b7..4803839 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/MetaDataEndpointImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/MetaDataEndpointImplTest.java @@ -234,7 +234,9 @@ public class MetaDataEndpointImplTest extends ParallelStatsDisabledIT { // now lets drop the parent table conn.createStatement().execute("DROP TABLE " + baseTable + " CASCADE"); - // the grand child should no longer exist + // the tables should no longer exist + PhoenixRuntime.getTableNoCache(conn, baseTable); + PhoenixRuntime.getTableNoCache(conn, child); PhoenixRuntime.getTableNoCache(conn, grandChild); }