Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.4 3d196a2e1 -> 45a9ce513
PHOENIX-5024 - Cleanup anonymous inner classes in PostDDLCompiler Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/45a9ce51 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/45a9ce51 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/45a9ce51 Branch: refs/heads/4.x-HBase-1.4 Commit: 45a9ce51341fd38583e4269ba16a16e863b22d22 Parents: 3d196a2 Author: Geoffrey Jacoby <gjac...@apache.org> Authored: Fri Nov 16 09:55:49 2018 -0800 Committer: Geoffrey Jacoby <gjac...@apache.org> Committed: Fri Nov 16 10:54:10 2018 -0800 ---------------------------------------------------------------------- .../apache/phoenix/compile/PostDDLCompiler.java | 478 ++++++++++--------- 1 file changed, 258 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/45a9ce51/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 709534e..a74c5f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -87,248 +87,286 @@ public class PostDDLCompiler { final long timestamp) throws SQLException { PhoenixStatement statement = new PhoenixStatement(connection); final StatementContext context = new StatementContext( - statement, - new ColumnResolver() { + statement, + new MultipleTableRefColumnResolver(tableRefs), + scan, + new SequenceManager(statement)); + return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs); + } - @Override - public List<TableRef> getTables() { - return tableRefs; - } + private static class MultipleTableRefColumnResolver implements ColumnResolver { - @Override - public TableRef resolveTable(String schemaName, String tableName) throws SQLException { - throw new UnsupportedOperationException(); - } + private final List<TableRef> tableRefs; - @Override - public ColumnRef resolveColumn(String schemaName, String tableName, String colName) - throws SQLException { - throw new UnsupportedOperationException(); - } + public MultipleTableRefColumnResolver(List<TableRef> tableRefs) { + this.tableRefs = tableRefs; + } - @Override - public List<PFunction> getFunctions() { - return Collections.<PFunction>emptyList(); - } - - @Override - public PFunction resolveFunction(String functionName) - throws SQLException { - throw new FunctionNotFoundException(functionName); - } - - @Override - public boolean hasUDFs() { - return false; - } - - @Override - public PSchema resolveSchema(String schemaName) throws SQLException { - throw new SchemaNotFoundException(schemaName); - } - - @Override - public List<PSchema> getSchemas() { - throw new UnsupportedOperationException(); - } - - }, - scan, - new SequenceManager(statement)); - return new BaseMutationPlan(context, Operation.UPSERT /* FIXME */) { - - @Override - public MutationState execute() throws SQLException { - if (tableRefs.isEmpty()) { - return new MutationState(0, 1000, connection); - } - boolean wasAutoCommit = connection.getAutoCommit(); - try { - connection.setAutoCommit(true); - SQLException sqlE = null; - /* - * Handles: - * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX; - * 2) deletion of all column values for a ALTER TABLE DROP COLUMN - * 3) updating the necessary rows to have an empty KV - * 4) updating table stats - */ - long totalMutationCount = 0; - for (final TableRef tableRef : tableRefs) { - Scan scan = ScanUtil.newScan(context.getScan()); - SelectStatement select = SelectStatement.COUNT_ONE; - // We need to use this tableRef - ColumnResolver resolver = new ColumnResolver() { - @Override - public List<TableRef> getTables() { - return Collections.singletonList(tableRef); - } - - @Override - public java.util.List<PFunction> getFunctions() { - return Collections.emptyList(); - }; - - @Override - public TableRef resolveTable(String schemaName, String tableName) - throws SQLException { - throw new UnsupportedOperationException(); - } - @Override - public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { - PColumn column = tableName != null - ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) - : tableRef.getTable().getColumnForColumnName(colName); - return new ColumnRef(tableRef, column.getPosition()); - } - - @Override - public PFunction resolveFunction(String functionName) throws SQLException { - throw new UnsupportedOperationException(); - }; - - @Override - public boolean hasUDFs() { - return false; - } + @Override + public List<TableRef> getTables() { + return tableRefs; + } - @Override - public List<PSchema> getSchemas() { - throw new UnsupportedOperationException(); - } + @Override + public TableRef resolveTable(String schemaName, String tableName) throws SQLException { + throw new UnsupportedOperationException(); + } - @Override - public PSchema resolveSchema(String schemaName) throws SQLException { - throw new SchemaNotFoundException(schemaName); - } - }; - PhoenixStatement statement = new PhoenixStatement(connection); - StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); - long ts = timestamp; - // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp. - // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions - // in this case, so maybe this is ok. - if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) { - ts = TransactionUtil.convertToNanoseconds(ts); - } - ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts); - if (emptyCF != null) { - scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); - scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst()); - } - ServerCache cache = null; - try { - if (deleteList != null) { - if (deleteList.isEmpty()) { - scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); - // In the case of a row deletion, add index metadata so mutable secondary indexing works - /* TODO: we currently manually run a scan to delete the index data here - ImmutableBytesWritable ptr = context.getTempPtr(); - tableRef.getTable().getIndexMaintainers(ptr); - if (ptr.getLength() > 0) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); - byte[] uuidValue = cache.getId(); - scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - } - */ - } else { - // In the case of the empty key value column family changing, do not send the index - // metadata, as we're currently managing this from the client. It's possible for the - // data empty column family to stay the same, while the index empty column family - // changes. - PColumn column = deleteList.get(0); - byte[] cq = column.getColumnQualifierBytes(); - if (emptyCF == null) { - scan.addColumn(column.getFamilyName().getBytes(), cq); - } - scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes()); - scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq); - } - } - List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size()); - if (projectCFs == null) { - for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) { - columnFamilies.add(family.getName().getBytes()); + @Override + public ColumnRef resolveColumn(String schemaName, String tableName, String colName) + throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public List<PFunction> getFunctions() { + return Collections.<PFunction>emptyList(); + } + + @Override + public PFunction resolveFunction(String functionName) + throws SQLException { + throw new FunctionNotFoundException(functionName); + } + + @Override + public boolean hasUDFs() { + return false; + } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + throw new SchemaNotFoundException(schemaName); + } + + @Override + public List<PSchema> getSchemas() { + throw new UnsupportedOperationException(); + } + + } + + private class PostDDLMutationPlan extends BaseMutationPlan { + + private final StatementContext context; + private final List<TableRef> tableRefs; + private final long timestamp; + private final byte[] emptyCF; + private final List<PColumn> deleteList; + private final List<byte[]> projectCFs; + + public PostDDLMutationPlan(StatementContext context, List<TableRef> tableRefs, long timestamp, byte[] emptyCF, List<PColumn> deleteList, List<byte[]> projectCFs) { + super(context, Operation.UPSERT); + this.context = context; + this.tableRefs = tableRefs; + this.timestamp = timestamp; + this.emptyCF = emptyCF; + this.deleteList = deleteList; + this.projectCFs = projectCFs; + } + + @Override + public MutationState execute() throws SQLException { + if (tableRefs.isEmpty()) { + return new MutationState(0, 1000, connection); + } + boolean wasAutoCommit = connection.getAutoCommit(); + try { + connection.setAutoCommit(true); + SQLException sqlE = null; + /* + * Handles: + * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX; + * 2) deletion of all column values for a ALTER TABLE DROP COLUMN + * 3) updating the necessary rows to have an empty KV + * 4) updating table stats + */ + long totalMutationCount = 0; + for (final TableRef tableRef : tableRefs) { + Scan scan = ScanUtil.newScan(context.getScan()); + SelectStatement select = SelectStatement.COUNT_ONE; + // We need to use this tableRef + ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef); + PhoenixStatement statement = new PhoenixStatement(connection); + StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); + long ts = timestamp; + // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp. + // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions + // in this case, so maybe this is ok. + if (ts!= HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) { + ts = TransactionUtil.convertToNanoseconds(ts); + } + ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts); + if (emptyCF != null) { + scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst()); + } + ServerCache cache = null; + try { + if (deleteList != null) { + if (deleteList.isEmpty()) { + scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE); + // In the case of a row deletion, add index metadata so mutable secondary indexing works + /* TODO: we currently manually run a scan to delete the index data here + ImmutableBytesWritable ptr = context.getTempPtr(); + tableRef.getTable().getIndexMaintainers(ptr); + if (ptr.getLength() > 0) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); + byte[] uuidValue = cache.getId(); + scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); } + */ } else { - for (byte[] projectCF : projectCFs) { - columnFamilies.add(projectCF); + // In the case of the empty key value column family changing, do not send the index + // metadata, as we're currently managing this from the client. It's possible for the + // data empty column family to stay the same, while the index empty column family + // changes. + PColumn column = deleteList.get(0); + byte[] cq = column.getColumnQualifierBytes(); + if (emptyCF == null) { + scan.addColumn(column.getFamilyName().getBytes(), cq); } + scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes()); + scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq); } - // Need to project all column families into the scan, since we haven't yet created our empty key value - RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY); - context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); - // Explicitly project these column families and don't project the empty key value, - // since at this point we haven't added the empty key value everywhere. - if (columnFamilies != null) { - scan.getFamilyMap().clear(); - for (byte[] family : columnFamilies) { - scan.addFamily(family); - } - projector = new RowProjector(projector,false); + } + List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size()); + if (projectCFs == null) { + for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) { + columnFamilies.add(family.getName().getBytes()); } - // Ignore exceptions due to not being able to resolve any view columns, - // as this just means the view is invalid. Continue on and try to perform - // any other Post DDL operations. - try { - // Since dropping a VIEW does not affect the underlying data, we do - // not need to pass through the view statement here. - WhereCompiler.compile(context, select); // Push where clause into scan - } catch (ColumnFamilyNotFoundException e) { - continue; - } catch (ColumnNotFoundException e) { - continue; - } catch (AmbiguousColumnException e) { - continue; + } else { + for (byte[] projectCF : projectCFs) { + columnFamilies.add(projectCF); + } + } + // Need to project all column families into the scan, since we haven't yet created our empty key value + RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY); + context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY); + // Explicitly project these column families and don't project the empty key value, + // since at this point we haven't added the empty key value everywhere. + if (columnFamilies != null) { + scan.getFamilyMap().clear(); + for (byte[] family : columnFamilies) { + scan.addFamily(family); } - QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null); + projector = new RowProjector(projector,false); + } + // Ignore exceptions due to not being able to resolve any view columns, + // as this just means the view is invalid. Continue on and try to perform + // any other Post DDL operations. + try { + // Since dropping a VIEW does not affect the underlying data, we do + // not need to pass through the view statement here. + WhereCompiler.compile(context, select); // Push where clause into scan + } catch (ColumnFamilyNotFoundException e) { + continue; + } catch (ColumnNotFoundException e) { + continue; + } catch (AmbiguousColumnException e) { + continue; + } + QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null, + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null); + try { + ResultIterator iterator = plan.iterator(); try { - ResultIterator iterator = plan.iterator(); + Tuple row = iterator.next(); + ImmutableBytesWritable ptr = context.getTempPtr(); + totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); + } catch (SQLException e) { + sqlE = e; + } finally { try { - Tuple row = iterator.next(); - ImmutableBytesWritable ptr = context.getTempPtr(); - totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr); + iterator.close(); } catch (SQLException e) { - sqlE = e; + if (sqlE == null) { + sqlE = e; + } else { + sqlE.setNextException(e); + } } finally { - try { - iterator.close(); - } catch (SQLException e) { - if (sqlE == null) { - sqlE = e; - } else { - sqlE.setNextException(e); - } - } finally { - if (sqlE != null) { - throw sqlE; - } + if (sqlE != null) { + throw sqlE; } } - } catch (TableNotFoundException e) { - // Ignore and continue, as HBase throws when table hasn't been written to - // FIXME: Remove if this is fixed in 0.96 - } - } finally { - if (cache != null) { // Remove server cache if there is one - cache.close(); } + } catch (TableNotFoundException e) { + // Ignore and continue, as HBase throws when table hasn't been written to + // FIXME: Remove if this is fixed in 0.96 } - - } - final long count = totalMutationCount; - return new MutationState(1, 1000, connection) { - @Override - public long getUpdateCount() { - return count; + } finally { + if (cache != null) { // Remove server cache if there is one + cache.close(); } - }; - } finally { - if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit); + } + } + final long count = totalMutationCount; + return new MutationState(1, 1000, connection) { + @Override + public long getUpdateCount() { + return count; + } + }; + } finally { + if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit); + } + } + + private class SingleTableRefColumnResolver implements ColumnResolver { + private final TableRef tableRef; + + public SingleTableRefColumnResolver(TableRef tableRef) { + this.tableRef = tableRef; + } + + @Override + public List<TableRef> getTables() { + return Collections.singletonList(tableRef); + } + + @Override + public List<PFunction> getFunctions() { + return Collections.emptyList(); + } + + ; + + @Override + public TableRef resolveTable(String schemaName, String tableName) + throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { + PColumn column = tableName != null + ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) + : tableRef.getTable().getColumnForColumnName(colName); + return new ColumnRef(tableRef, column.getPosition()); + } + + @Override + public PFunction resolveFunction(String functionName) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasUDFs() { + return false; + } + + @Override + public List<PSchema> getSchemas() { + throw new UnsupportedOperationException(); + } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + throw new SchemaNotFoundException(schemaName); } - }; + } } } \ No newline at end of file