http://git-wip-us.apache.org/repos/asf/phoenix/blob/fccbe56a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropViewIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropViewIT.java deleted file mode 100644 index e52f8cc..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropViewIT.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end.index; - -import static org.apache.phoenix.util.TestUtil.HBASE_NATIVE; -import static org.apache.phoenix.util.TestUtil.HBASE_NATIVE_SCHEMA_NAME; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeTableReuseIT; -import org.apache.phoenix.end2end.Shadower; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.SchemaUtil; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Maps; - -public class DropViewIT extends BaseHBaseManagedTimeIT { - private static final byte[] HBASE_NATIVE_BYTES = SchemaUtil.getTableNameAsBytes(HBASE_NATIVE_SCHEMA_NAME, HBASE_NATIVE); - private static final byte[] FAMILY_NAME = Bytes.toBytes(SchemaUtil.normalizeIdentifier("1")); - - @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) - @BeforeClass - public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(1); - // Drop the HBase table metadata for this test - props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); - // Must update config before starting server - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - } - - @Test - public void testDropViewKeepsHTable() throws Exception { - HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TEST_PROPERTIES).getAdmin(); - try { - try { - admin.disableTable(HBASE_NATIVE_BYTES); - admin.deleteTable(HBASE_NATIVE_BYTES); - } catch (org.apache.hadoop.hbase.TableNotFoundException e) { - } - @SuppressWarnings("deprecation") - HTableDescriptor descriptor = new HTableDescriptor(HBASE_NATIVE_BYTES); - HColumnDescriptor columnDescriptor = new HColumnDescriptor(FAMILY_NAME); - columnDescriptor.setKeepDeletedCells(true); - descriptor.addFamily(columnDescriptor); - admin.createTable(descriptor); - } finally { - admin.close(); - } - - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.createStatement().execute("create view " + HBASE_NATIVE + - " (uint_key unsigned_int not null," + - " ulong_key unsigned_long not null," + - " string_key varchar not null,\n" + - " \"1\".uint_col unsigned_int," + - " \"1\".ulong_col unsigned_long" + - " CONSTRAINT pk PRIMARY KEY (uint_key, ulong_key, string_key))\n" + - HColumnDescriptor.DATA_BLOCK_ENCODING + "='" + DataBlockEncoding.NONE + "'"); - conn.createStatement().execute("drop view " + HBASE_NATIVE); - - admin = driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).getAdmin(); - try { - try { - admin.disableTable(HBASE_NATIVE_BYTES); - admin.deleteTable(HBASE_NATIVE_BYTES); - } catch (org.apache.hadoop.hbase.TableNotFoundException e) { - fail(); // The underlying HBase table should still exist - } - } finally { - admin.close(); - } - } -} -
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fccbe56a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index bcfdf20..770917f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -83,7 +83,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -116,7 +115,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; @@ -134,7 +137,13 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -142,18 +151,21 @@ import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PFunction.FunctionArgument; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; +import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PColumnImpl; @@ -169,6 +181,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.schema.tuple.ResultTuple; @@ -1420,6 +1433,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return; } List<byte[]> tableNamesToDelete = Lists.newArrayList(); + List<SharedTableState> sharedTablesToDelete = Lists.newArrayList(); byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata); byte[] lockTableName = parentTableName == null ? tableName : parentTableName; byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName); @@ -1444,7 +1458,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso result = doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName, PTableType.fromSerializedValue(tableType), tableMetadata, - invalidateList, locks, tableNamesToDelete, isCascade); + invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { done.run(MetaDataMutationResult.toProto(result)); return; @@ -1475,7 +1489,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName, byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete, List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, - List<byte[]> tableNamesToDelete, boolean isCascade) throws IOException, SQLException { + List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade) throws IOException, SQLException { long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete); @@ -1550,7 +1564,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso acquireLock(region, viewKey, locks); MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName, viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks, - tableNamesToDelete, false); + tableNamesToDelete, sharedTablesToDelete, false); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; } } } @@ -1562,9 +1576,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - if (tableType != PTableType.VIEW) { // Add to list of HTables to delete, unless it's a view + // Add to list of HTables to delete, unless it's a view or its a shared index + if (tableType != PTableType.VIEW && table.getViewIndexId()==null) { tableNamesToDelete.add(table.getName().getBytes()); } + else { + sharedTablesToDelete.add(new SharedTableState(table)); + } invalidateList.add(cacheKey); byte[][] rowKeyMetaData = new byte[5][]; do { @@ -1601,7 +1619,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso acquireLock(region, indexKey, locks); MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX, - rowsToDelete, invalidateList, locks, tableNamesToDelete, false); + rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; } @@ -1708,7 +1726,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region, invalidateList, locks, clientTimeStamp); - if (result != null) { + // if the update mutation caused tables to be deleted, the mutation code returned will be MutationCode.TABLE_ALREADY_EXISTS + if (result != null && result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) { return result; } region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); @@ -1719,8 +1738,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Get client timeStamp from mutations, since it may get updated by the // mutateRowsWithLocks call long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata); - return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, - null); + // if the update mutation caused tables to be deleted just return the result which will contain the table to be deleted + return result != null ? result : new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null); } finally { region.releaseRowLocks(locks); } @@ -1771,6 +1790,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso this.offset = lowestOrdinalPos; } + private void addColumn(byte[] columnKey) { + columnKeys.add(columnKey); + } + + private void dropColumn(byte[] columnKey) { + // Check if an entry for this column key exists + int index = -1; + for (int i = 0; i < columnKeys.size(); i++) { + if (Bytes.equals(columnKeys.get(i), columnKey)) { + index = i; + break; + } + } + if (index != -1) { + columnKeys.remove(index); + } + } + private void addColumn(byte[] columnKey, int position) { checkArgument(position >= this.offset); int index = position - offset; @@ -1857,7 +1894,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return false; } - private MetaDataMutationResult addRowsToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName, + private MetaDataMutationResult addColumnsToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName, List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinderResult childViewsResult, HRegion region, List<RowLock> locks) throws IOException, SQLException { List<PutWithOrdinalPosition> columnPutsForBaseTable = new ArrayList<>(tableMetadata.size()); @@ -1866,7 +1903,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (m instanceof Put) { byte[][] rkmd = new byte[5][]; int pkCount = getVarChars(m.getRow(), rkmd); - if (m instanceof Put && pkCount > COLUMN_NAME_INDEX + if (pkCount > COLUMN_NAME_INDEX && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0 && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) { columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES))); @@ -2039,71 +2076,235 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view, deltaNumPkColsSoFar); - // Update the view header rows with new column counts. - Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp); - if (!isDivergedView(view) && columnsAddedToBaseTable > 0) { - // Base column count should only be updated for diverged views. - int oldBaseColumnCount = view.getBaseColumnCount(); - byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()]; - PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + columnsAddedToBaseTable, baseColumnCountPtr, 0); - viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr); - } - - if (columnsAddedToView > 0) { - byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()]; - PInteger.INSTANCE.getCodec().encodeInt(numCols + columnsAddedToView, columnCountPtr, 0); - viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr); - } - /* * Increment the sequence number by 1 if: * 1) For a diverged view, there were columns (pk columns) added to the view. * 2) For a non-diverged view if the base column count changed. - * */ boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0) || (!isDivergedView(view) && columnsAddedToBaseTable > 0); - if (changeSequenceNumber) { - byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()]; - PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0); - viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr); - - mutationsForAddingColumnsToViews.add(viewHeaderRowPut); - - // only invalidate if the sequence number is about to change - invalidateList.add(new ImmutableBytesPtr(viewKey)); - - // Update the ordinal positions. The list would be non-empty only if the sequence - // number will change. - int i = 0; - for (byte[] columnKey : ordinalPositionList.columnKeys) { - int ordinalPosition = ordinalPositionList.getOrdinalPositionFromListIdx(i); - Put positionUpdatePut = new Put(columnKey, clientTimeStamp); - byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()]; - PInteger.INSTANCE.getCodec().encodeInt(ordinalPosition, ptr, 0); - positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr); - mutationsForAddingColumnsToViews.add(positionUpdatePut); - i++; - } + updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, + invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable, + viewKey, view, ordinalPositionList, numCols, changeSequenceNumber); + } + return null; + } + + /** + * Updates the base table column count, column count, sequence number and ordinal postions of + * columns of the view based on the columns being added or dropped. + */ + private void updateViewHeaderRow(PTable basePhysicalTable, List<Mutation> tableMetadata, + List<Mutation> mutationsForAddingColumnsToViews, + List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, short viewColumnDelta, + short baseTableColumnDelta, byte[] viewKey, PTable view, + ColumnOrdinalPositionUpdateList ordinalPositionList, int numCols, boolean changeSequenceNumber) { + // Update the view header rows with new column counts. + Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp); + if (!isDivergedView(view) && baseTableColumnDelta != 0) { + // Base column count should only be updated for diverged views. + int oldBaseColumnCount = view.getBaseColumnCount(); + byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()]; + PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + baseTableColumnDelta, baseColumnCountPtr, 0); + viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr); + } + + if (viewColumnDelta != 0) { + byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()]; + PInteger.INSTANCE.getCodec().encodeInt(numCols + viewColumnDelta, columnCountPtr, 0); + viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr); + } + + if (changeSequenceNumber) { + byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()]; + PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0); + viewHeaderRowPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr); + + mutationsForAddingColumnsToViews.add(viewHeaderRowPut); + + // only invalidate if the sequence number is about to change + invalidateList.add(new ImmutableBytesPtr(viewKey)); + + // Update the ordinal positions. The list would be non-empty only if the sequence + // number will change. + int i = 0; + for (byte[] columnKey : ordinalPositionList.columnKeys) { + int ordinalPosition = ordinalPositionList.getOrdinalPositionFromListIdx(i); + Put positionUpdatePut = new Put(columnKey, clientTimeStamp); + byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()]; + PInteger.INSTANCE.getCodec().encodeInt(ordinalPosition, ptr, 0); + positionUpdatePut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr); + mutationsForAddingColumnsToViews.add(positionUpdatePut); + i++; } - - if (view.rowKeyOrderOptimizable()) { - UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, viewKey, clientTimeStamp); + } + + if (view.rowKeyOrderOptimizable()) { + UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, viewKey, clientTimeStamp); + } + + // if switching from from non tx to tx + if (!basePhysicalTable.isTransactional() && switchAttribute(basePhysicalTable, basePhysicalTable.isTransactional(), tableMetadata, TRANSACTIONAL_BYTES)) { + invalidateList.add(new ImmutableBytesPtr(viewKey)); + Put put = new Put(viewKey); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + TRANSACTIONAL_BYTES, clientTimeStamp, PBoolean.INSTANCE.toBytes(true)); + mutationsForAddingColumnsToViews.add(put); + } + } + + private class ColumnFinder extends StatelessTraverseAllExpressionVisitor<Void> { + private boolean columnFound; + private final ColumnExpression columnExpression; + + public ColumnFinder(ColumnExpression columnExpression) { + this.columnExpression = columnExpression; + columnFound = false; + } + + private Void process(ColumnExpression expression) { + if (expression.equals(columnExpression)) { + columnFound = true; } - - // if switching from from non tx to tx - if (!basePhysicalTable.isTransactional() && switchAttribute(basePhysicalTable, basePhysicalTable.isTransactional(), tableMetadata, TRANSACTIONAL_BYTES)) { - invalidateList.add(new ImmutableBytesPtr(viewKey)); - Put put = new Put(viewKey); - put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - TRANSACTIONAL_BYTES, clientTimeStamp, PBoolean.INSTANCE.toBytes(true)); - mutationsForAddingColumnsToViews.add(put); + return null; + } + + @Override + public Void visit(KeyValueColumnExpression expression) { + return process(expression); + } + + @Override + public Void visit(RowKeyColumnExpression expression) { + return process(expression); + } + + @Override + public Void visit(ProjectedColumnExpression expression) { + return process(expression); + } + + public boolean getColumnFound() { + return columnFound; + } + } + + private MetaDataMutationResult dropColumnsFromChildViews(HRegion region, + PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata, + List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName, + List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, + TableViewFinderResult childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) + throws IOException, SQLException { + List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size()); + // Isolate the deletes relevant to dropping columns. Also figure out what kind of columns + // are being added. + for (Mutation m : tableMetadata) { + if (m instanceof Delete) { + byte[][] rkmd = new byte[5][]; + int pkCount = getVarChars(m.getRow(), rkmd); + if (pkCount > COLUMN_NAME_INDEX + && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0 + && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) { + columnDeletesForBaseTable.add((Delete) m); + } } } + for (Result viewResult : childViewsResult.getResults()) { + short numColsDeleted = 0; + byte[][] rowViewKeyMetaData = new byte[3][]; + getVarChars(viewResult.getRow(), 3, rowViewKeyMetaData); + byte[] viewKey = + SchemaUtil.getTableKey( + rowViewKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX], + rowViewKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX], + rowViewKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]); + + // lock the rows corresponding to views so that no other thread can modify the view + // meta-data + RowLock viewRowLock = acquireLock(region, viewKey, locks); + PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock); + + ColumnOrdinalPositionUpdateList ordinalPositionList = + new ColumnOrdinalPositionUpdateList(); + int numCols = view.getColumns().size(); + int minDroppedColOrdinalPos = Integer.MAX_VALUE; + for (Delete columnDeleteForBaseTable : columnDeletesForBaseTable) { + PColumn existingViewColumn = null; + byte[][] rkmd = new byte[5][]; + getVarChars(columnDeleteForBaseTable.getRow(), rkmd); + String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]); + String columnFamily = + rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes + .toString(rkmd[FAMILY_NAME_INDEX]); + byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily); + try { + existingViewColumn = + columnFamily == null ? view.getColumn(columnName) : view + .getColumnFamily(columnFamily).getColumn(columnName); + } catch (ColumnFamilyNotFoundException e) { + // ignore since it means that the column family is not present for the column to + // be added. + } catch (ColumnNotFoundException e) { + // ignore since it means the column is not present in the view + } + + // check if the view where expression contains the column being dropped and prevent + // it + if (existingViewColumn != null && view.getViewStatement() != null) { + ParseNode viewWhere = + new SQLParser(view.getViewStatement()).parseQuery().getWhere(); + PhoenixConnection conn=null; + try { + conn = QueryUtil.getConnection(env.getConfiguration()).unwrap( + PhoenixConnection.class); + } catch (ClassNotFoundException e) { + } + PhoenixStatement statement = new PhoenixStatement(conn); + TableRef baseTableRef = new TableRef(basePhysicalTable); + ColumnResolver columnResolver = FromCompiler.getResolver(baseTableRef); + StatementContext context = new StatementContext(statement, columnResolver); + Expression whereExpression = WhereCompiler.compile(context, viewWhere); + ColumnExpression colExpression = + new ColumnRef(baseTableRef, existingViewColumn.getPosition()) + .newColumnExpression(); + ColumnFinder columnFinder = new ColumnFinder(colExpression); + whereExpression.accept(columnFinder); + if (columnFinder.getColumnFound()) { + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, + EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } + } + + minDroppedColOrdinalPos = + Math.min(getOrdinalPosition(view, existingViewColumn), + minDroppedColOrdinalPos); + if (existingViewColumn != null) { + --numColsDeleted; + if (ordinalPositionList.size() == 0) { + ordinalPositionList.setOffset(view.getBucketNum() == null ? 1 : 0); + for (PColumn col : view.getColumns()) { + ordinalPositionList.addColumn(getColumnKey(viewKey, col)); + } + } + ordinalPositionList.dropColumn(columnKey); + Delete viewColumnDelete = new Delete(columnKey, clientTimeStamp); + mutationsForAddingColumnsToViews.add(viewColumnDelete); + // drop any view indexes that need this column + dropIndexes(view, region, invalidateList, locks, clientTimeStamp, + schemaName, view.getName().getBytes(), + mutationsForAddingColumnsToViews, existingViewColumn, + tableNamesToDelete, sharedTablesToDelete); + } + } + + updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, + invalidateList, clientTimeStamp, numColsDeleted, numColsDeleted, viewKey, view, + ordinalPositionList, numCols, true); + } return null; } @@ -2332,7 +2533,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } else { mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getResults().size() * tableMetaData.size()); - MetaDataMutationResult mutationResult = addRowsToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp, + MetaDataMutationResult mutationResult = addColumnsToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp, childViewsResult, region, locks); // return if we were not able to add the column successfully if (mutationResult!=null) @@ -2555,10 +2756,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso public void dropColumn(RpcController controller, DropColumnRequest request, RpcCallback<MetaDataResponse> done) { List<Mutation> tableMetaData = null; - + final List<byte[]> tableNamesToDelete = Lists.newArrayList(); + final List<SharedTableState> sharedTablesToDelete = Lists.newArrayList(); try { tableMetaData = ProtobufUtil.getMutations(request); - final List<byte[]> tableNamesToDelete = Lists.newArrayList(); MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() { @Override public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData, @@ -2580,8 +2781,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TableViewFinderResult childViewsResult = findChildViews(region, tenantId, table, PHYSICAL_TABLE_BYTES); if (childViewsResult.hasViews()) { - return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager - .currentTimeMillis(), null); + MetaDataMutationResult mutationResult = + dropColumnsFromChildViews(region, table, + locks, tableMetaData, additionalTableMetaData, + schemaName, tableName, invalidateList, + clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete); + // return if we were not able to drop the column successfully + if (mutationResult != null) return mutationResult; } } @@ -2633,42 +2839,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete); } - // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the index and then invalidate it - // Covered columns are deleted from the index by the client - PhoenixConnection connection = table.getIndexes().isEmpty() ? null : QueryUtil.getConnection(env.getConfiguration()).unwrap(PhoenixConnection.class); - for (PTable index : table.getIndexes()) { - try { - IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection); - // get the columns required for the index pk - Set<ColumnReference> indexColumns = indexMaintainer.getIndexedColumns(); - byte[] indexKey = - SchemaUtil.getTableKey(tenantId, index - .getSchemaName().getBytes(), index.getTableName().getBytes()); - // If index requires this column for its pk, then drop it - if (indexColumns.contains(new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getName().getBytes()))) { - // Since we're dropping the index, lock it to ensure - // that a change in index state doesn't - // occur while we're dropping it. - acquireLock(region, indexKey, locks); - // Drop the index table. The doDropTable will expand - // this to all of the table rows and invalidate the - // index table - additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp)); - byte[] linkKey = MetaDataUtil.getParentLinkKey(tenantId, - schemaName, tableName, index.getTableName().getBytes()); - // Drop the link between the data table and the - // index table - additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp)); - doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index.getTableName().getBytes(), tableName, - index.getType(), additionalTableMetaData, invalidateList, locks, tableNamesToDelete, false); - // TODO: return in result? - } else { - invalidateList.add(new ImmutableBytesPtr(indexKey)); - } - } catch (ColumnNotFoundException e) { - } catch (AmbiguousColumnException e) { - } - } + // drop any indexes that need the column that is going to be dropped + dropIndexes(table, region, invalidateList, locks, + clientTimeStamp, schemaName, tableName, + additionalTableMetaData, columnToDelete, + tableNamesToDelete, sharedTablesToDelete); } catch (ColumnFamilyNotFoundException e) { return new MetaDataMutationResult( MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager @@ -2677,8 +2852,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return new MetaDataMutationResult( MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager .currentTimeMillis(), table, columnToDelete); - } catch (ClassNotFoundException e1) { - } + } } } } @@ -2689,7 +2863,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } tableMetaData.addAll(additionalTableMetaData); - return null; + long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData); + return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete); } }); if (result != null) { @@ -2699,6 +2874,58 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ProtobufUtil.setControllerException(controller, ioe); } } + + private void dropIndexes(PTable table, HRegion region, List<ImmutableBytesPtr> invalidateList, + List<RowLock> locks, long clientTimeStamp, byte[] schemaName, + byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete, + List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) + throws IOException, SQLException { + // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the + // index and then invalidate it + // Covered columns are deleted from the index by the client + PhoenixConnection connection = null; + try { + connection = table.getIndexes().isEmpty() ? null : QueryUtil.getConnection( + env.getConfiguration()).unwrap(PhoenixConnection.class); + } catch (ClassNotFoundException e) { + } + for (PTable index : table.getIndexes()) { + byte[] tenantId = index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes(); + IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection); + byte[] indexKey = + SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index + .getTableName().getBytes()); + // If index requires this column for its pk, then drop it + if (indexMaintainer.getIndexedColumns().contains( + new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete + .getName().getBytes()))) { + // Since we're dropping the index, lock it to ensure + // that a change in index state doesn't + // occur while we're dropping it. + acquireLock(region, indexKey, locks); + // Drop the index table. The doDropTable will expand + // this to all of the table rows and invalidate the + // index table + additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp)); + byte[] linkKey = + MetaDataUtil.getParentLinkKey(tenantId, schemaName, tableName, index + .getTableName().getBytes()); + // Drop the link between the data table and the + // index table + additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp)); + doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index + .getTableName().getBytes(), tableName, index.getType(), + additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false); + invalidateList.add(new ImmutableBytesPtr(indexKey)); + } + // If the dropped column is a covered index column, invalidate the index + else if (indexMaintainer.getCoverededColumns().contains( + new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete + .getName().getBytes()))) { + invalidateList.add(new ImmutableBytesPtr(indexKey)); + } + } + } @Override public void clearCache(RpcController controller, ClearCacheRequest request, http://git-wip-us.apache.org/repos/asf/phoenix/blob/fccbe56a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 7a7cf31..f8b4c79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -29,10 +29,14 @@ import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.ByteUtil; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -98,11 +102,77 @@ public abstract class MetaDataProtocol extends MetaDataService { NO_OP }; + public static class SharedTableState { + private PName tenantId; + private PName schemaName; + private PName tableName; + private List<PColumn> columns; + private List<PName> physicalNames; + private Short viewIndexId; + + public SharedTableState(PTable table) { + this.tenantId = table.getTenantId(); + this.schemaName = table.getSchemaName(); + this.tableName = table.getTableName(); + this.columns = table.getColumns(); + this.physicalNames = table.getPhysicalNames(); + this.viewIndexId = table.getViewIndexId(); + } + + public SharedTableState( + org.apache.phoenix.coprocessor.generated.MetaDataProtos.SharedTableState sharedTable) { + this.tenantId = sharedTable.hasTenantId() ? PNameFactory.newName(sharedTable.getTenantId().toByteArray()) : null; + this.schemaName = PNameFactory.newName(sharedTable.getSchemaName().toByteArray()); + this.tableName = PNameFactory.newName(sharedTable.getTableName().toByteArray()); + this.columns = Lists.transform(sharedTable.getColumnsList(), + new Function<org.apache.phoenix.coprocessor.generated.PTableProtos.PColumn, PColumn>() { + @Override + public PColumn apply(org.apache.phoenix.coprocessor.generated.PTableProtos.PColumn column) { + return PColumnImpl.createFromProto(column); + } + }); + this.physicalNames = Lists.transform(sharedTable.getPhysicalNamesList(), + new Function<ByteString, PName>() { + @Override + public PName apply(ByteString physicalName) { + return PNameFactory.newName(physicalName.toByteArray()); + } + }); + this.viewIndexId = (short)sharedTable.getViewIndexId(); + } + + public PName getTenantId() { + return tenantId; + } + + public PName getSchemaName() { + return schemaName; + } + + public PName getTableName() { + return tableName; + } + + public List<PColumn> getColumns() { + return columns; + } + + public List<PName> getPhysicalNames() { + return physicalNames; + } + + public Short getViewIndexId() { + return viewIndexId; + } + + } + public static class MetaDataMutationResult { private MutationCode returnCode; private long mutationTime; private PTable table; private List<byte[]> tableNamesToDelete; + private List<SharedTableState> sharedTablesToDelete; private byte[] columnName; private byte[] familyName; private boolean wasUpdated; @@ -142,6 +212,11 @@ public abstract class MetaDataProtocol extends MetaDataService { this.table = table; this.tableNamesToDelete = tableNamesToDelete; } + + public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) { + this(returnCode, currentTime, table, tableNamesToDelete); + this.sharedTablesToDelete = sharedTablesToDelete; + } public MutationCode getMutationCode() { return returnCode; @@ -182,6 +257,10 @@ public abstract class MetaDataProtocol extends MetaDataService { public List<PFunction> getFunctions() { return functions; } + + public List<SharedTableState> getSharedTablesToDelete() { + return sharedTablesToDelete; + } public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) { MetaDataMutationResult result = new MetaDataMutationResult(); @@ -210,6 +289,14 @@ public abstract class MetaDataProtocol extends MetaDataService { if(proto.hasFamilyName()){ result.familyName = proto.getFamilyName().toByteArray(); } + if(proto.getSharedTablesToDeleteCount() > 0) { + result.sharedTablesToDelete = + Lists.newArrayListWithExpectedSize(proto.getSharedTablesToDeleteCount()); + for (org.apache.phoenix.coprocessor.generated.MetaDataProtos.SharedTableState sharedTable : + proto.getSharedTablesToDeleteList()) { + result.sharedTablesToDelete.add(new SharedTableState(sharedTable)); + } + } return result; } @@ -234,6 +321,25 @@ public abstract class MetaDataProtocol extends MetaDataService { if(result.getFamilyName() != null){ builder.setFamilyName(ByteStringer.wrap(result.getFamilyName())); } + if (result.getSharedTablesToDelete() !=null){ + for (SharedTableState sharedTableState : result.sharedTablesToDelete) { + org.apache.phoenix.coprocessor.generated.MetaDataProtos.SharedTableState.Builder sharedTableStateBuilder = + org.apache.phoenix.coprocessor.generated.MetaDataProtos.SharedTableState.newBuilder(); + for (PColumn col : sharedTableState.getColumns()) { + sharedTableStateBuilder.addColumns(PColumnImpl.toProto(col)); + } + for (PName physicalName : sharedTableState.getPhysicalNames()) { + sharedTableStateBuilder.addPhysicalNames(ByteStringer.wrap(physicalName.getBytes())); + } + if (sharedTableState.getTenantId()!=null) { + sharedTableStateBuilder.setTenantId(ByteStringer.wrap(sharedTableState.getTenantId().getBytes())); + } + sharedTableStateBuilder.setSchemaName(ByteStringer.wrap(sharedTableState.getSchemaName().getBytes())); + sharedTableStateBuilder.setTableName(ByteStringer.wrap(sharedTableState.getTableName().getBytes())); + sharedTableStateBuilder.setViewIndexId(sharedTableState.getViewIndexId()); + builder.addSharedTablesToDelete(sharedTableStateBuilder.build()); + } + } } return builder.build(); }
