Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 dacc695cf -> 767ce8a8a
PHOENIX-4798 Update encoded col qualifiers on the base table correctly Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/767ce8a8 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/767ce8a8 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/767ce8a8 Branch: refs/heads/4.x-HBase-1.2 Commit: 767ce8a8a3fb7b518cababbfa871f87d54b7a632 Parents: dacc695 Author: Thomas D'Silva <tdsi...@apache.org> Authored: Wed Aug 15 12:23:56 2018 -0700 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Tue Aug 28 12:18:40 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/phoenix/end2end/ViewIT.java | 184 +++++++++++++------ .../coprocessor/MetaDataEndpointImpl.java | 74 +++++--- .../PhoenixMetaDataCoprocessorHost.java | 2 +- .../apache/phoenix/schema/MetaDataClient.java | 33 +++- 4 files changed, 208 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/767ce8a8/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java index fda9490..c1a7ff5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -59,19 +60,16 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseMetaDataEndpointObserver; +import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost; +import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; @@ -98,36 +96,39 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.Maps; - @RunWith(Parameterized.class) public class ViewIT extends SplitSystemCatalogIT { protected String tableDDLOptions; protected boolean transactional; + protected boolean columnEncoded; - private static final String FAILED_VIEWNAME = "FAILED_VIEW"; - private static final byte[] FAILED_ROWKEY_BYTES = - SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), Bytes.toBytes(FAILED_VIEWNAME)); - private static final String SLOW_VIEWNAME_PREFIX = "SLOW_VIEW"; - private static final byte[] SLOW_ROWKEY_PREFIX_BYTES = - SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), - Bytes.toBytes(SLOW_VIEWNAME_PREFIX)); + private static final String FAILED_VIEWNAME = SchemaUtil.getTableName(SCHEMA2, "FAILED_VIEW"); + private static final String SLOW_VIEWNAME_PREFIX = SchemaUtil.getTableName(SCHEMA2, "SLOW_VIEW"); private static volatile CountDownLatch latch1 = null; private static volatile CountDownLatch latch2 = null; - public ViewIT(boolean transactional) { + public ViewIT(boolean transactional, boolean columnEncoded) { StringBuilder optionBuilder = new StringBuilder(); this.transactional = transactional; + this.columnEncoded = columnEncoded; if (transactional) { optionBuilder.append(" TRANSACTIONAL=true "); } + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } this.tableDDLOptions = optionBuilder.toString(); } - @Parameters(name = "transactional = {0}") - public static Collection<Boolean> data() { - return Arrays.asList(new Boolean[] { false, true }); + @Parameters(name="ViewIT_transactional={0}, columnEncoded={1}") // name is used by failsafe as file name in reports + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { true, false }, { true, true }, + { false, false }, { false, true }}); } @BeforeClass @@ -136,7 +137,9 @@ public class ViewIT extends SplitSystemCatalogIT { Map<String, String> props = Collections.emptyMap(); boolean splitSystemCatalog = (driver == null); Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); - serverProps.put("hbase.coprocessor.region.classes", TestMetaDataRegionObserver.class.getName()); + serverProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true"); + serverProps.put(PhoenixMetaDataCoprocessorHost.PHOENIX_META_DATA_COPROCESSOR_CONF_KEY, + TestMetaDataRegionObserver.class.getName()); serverProps.put("hbase.coprocessor.abortonerror", "false"); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(props.entrySet().iterator())); // Split SYSTEM.CATALOG once after the mini-cluster is started @@ -145,17 +148,36 @@ public class ViewIT extends SplitSystemCatalogIT { } } - public static class TestMetaDataRegionObserver extends BaseRegionObserver { + public static class TestMetaDataRegionObserver extends BaseMetaDataEndpointObserver { + + @Override + public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType type) throws IOException{ + processTable(tableName); + } + + @Override + public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + Set<byte[]> familySet, Set<TableName> indexes) throws IOException { + processTable(tableName); + } + @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - if (shouldFail(c, miniBatchOp.getOperation(0))) { + public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId, + String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType, + List<PTable> indexes) throws IOException { + processTable(tableName); + } + + private void processTable(String tableName) throws DoNotRetryIOException { + if (tableName.equals(FAILED_VIEWNAME)) { // throwing anything other than instances of IOException result // in this coprocessor being unloaded // DoNotRetryIOException tells HBase not to retry this mutation // multiple times throw new DoNotRetryIOException(); - } else if (shouldSlowDown(c, miniBatchOp.getOperation(0))) { + } else if (tableName.startsWith(SLOW_VIEWNAME_PREFIX)) { // simulate a slow write to SYSTEM.CATALOG if (latch1 != null) { latch1.countDown(); @@ -172,20 +194,7 @@ public class ViewIT extends SplitSystemCatalogIT { } } } - - private boolean shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) { - TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); - return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) - && (Bytes.equals(FAILED_ROWKEY_BYTES, m.getRow())); - } - - private boolean shouldSlowDown(ObserverContext<RegionCoprocessorEnvironment> c, - Mutation m) { - TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); - byte[] rowKeyPrefix = Arrays.copyOf(m.getRow(), SLOW_ROWKEY_PREFIX_BYTES.length); - return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) - && (Bytes.equals(SLOW_ROWKEY_PREFIX_BYTES, rowKeyPrefix)); - } + } @Test @@ -598,9 +607,6 @@ public class ViewIT extends SplitSystemCatalogIT { public void testViewAndTableAndDropCascadeWithIndexes() throws Exception { // Setup - Tables and Views with Indexes Connection conn = DriverManager.getConnection(getUrl()); - if (tableDDLOptions.length()!=0) - tableDDLOptions+=","; - tableDDLOptions+="IMMUTABLE_ROWS=true"; String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); String ddl = "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + tableDDLOptions; conn.createStatement().execute(ddl); @@ -1311,7 +1317,7 @@ public class ViewIT extends SplitSystemCatalogIT { public void testChildViewCreationFails() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); - String fullViewName1 = SchemaUtil.getTableName(SCHEMA2, FAILED_VIEWNAME); + String fullViewName1 = FAILED_VIEWNAME; String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName()); String tableDdl = "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + tableDDLOptions; @@ -1343,9 +1349,7 @@ public class ViewIT extends SplitSystemCatalogIT { public void testConcurrentViewCreationAndTableDrop() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); - String fullViewName1 = - SchemaUtil.getTableName(SCHEMA2, - SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName()); + String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName(); String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName()); latch1 = new CountDownLatch(1); latch2 = new CountDownLatch(1); @@ -1392,12 +1396,12 @@ public class ViewIT extends SplitSystemCatalogIT { } @Test - public void testConcurrentAddColumn() throws Exception { + public void testConcurrentAddSameColumnDifferentType() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { + latch1 = null; + latch2 = null; String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); - String fullViewName1 = - SchemaUtil.getTableName(SCHEMA2, - SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName()); + String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName(); String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName()); // create base table String tableDdl = @@ -1422,10 +1426,10 @@ public class ViewIT extends SplitSystemCatalogIT { } }); - // add a column to the view in a separate thread (which will take - // some time to complete) + // add a column with the same name and different type to the view in a separate thread + // (which will take some time to complete) Future<Exception> future = executorService.submit(new AddColumnRunnable(fullViewName1)); - // wait till the thread makes the rpc to create the view + // wait till the thread makes the rpc to add the column boolean result = latch1.await(2, TimeUnit.MINUTES); if (!result) { fail("The create view rpc look too long"); @@ -1451,6 +1455,82 @@ public class ViewIT extends SplitSystemCatalogIT { conn.createStatement().execute(tableDdl); } } + + @Test + public void testConcurrentAddDifferentColumn() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + latch1 = null; + latch2 = null; + String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); + String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName(); + String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName()); + String fullViewName3 = SchemaUtil.getTableName(SCHEMA4, generateUniqueName()); + // create base table + String tableDdl = + "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + + tableDDLOptions; + conn.createStatement().execute(tableDdl); + // create a two views + String ddl = + "CREATE VIEW " + fullViewName1 + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 6"; + conn.createStatement().execute(ddl); + ddl = + "CREATE VIEW " + fullViewName3 + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 7"; + conn.createStatement().execute(ddl); + + latch1 = new CountDownLatch(1); + latch2 = new CountDownLatch(1); + ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + return t; + } + }); + + // add a column to a view in a separate thread (we slow this operation down) + Future<Exception> future = executorService.submit(new AddColumnRunnable(fullViewName1)); + // wait till the thread makes the rpc to add the column + boolean result = latch1.await(2, TimeUnit.MINUTES); + if (!result) { + fail("The alter view rpc look too long"); + } + tableDdl = "ALTER VIEW " + fullViewName3 + " ADD v4 INTEGER"; + try { + // add a column to another view + conn.createStatement().execute(tableDdl); + if (columnEncoded) { + // this should fail as the previous add column is still not complete + fail( + "Adding columns to two different views concurrently where the base table uses encoded column should fail"); + } + } catch (ConcurrentTableMutationException e) { + if (!columnEncoded) { + // this should not fail as we don't need to update the parent table for non + // column encoded tables + fail( + "Adding columns to two different views concurrently where the base table does not use encoded columns should succeed"); + } + } + latch2.countDown(); + + Exception e = future.get(); + // if the base table uses column encoding then the add column operation for fullViewName1 fails + assertNull(e); + + // add a the same column to the another view to ensure that the cell used + // to prevent concurrent modifications was removed + ddl = "CREATE VIEW " + fullViewName2 + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 6"; + conn.createStatement().execute(ddl); + tableDdl = "ALTER VIEW " + fullViewName2 + " ADD v3 INTEGER"; + conn.createStatement().execute(tableDdl); + } + } private class CreateViewRunnable implements Callable<Exception> { private final String fullTableName; http://git-wip-us.apache.org/repos/asf/phoenix/blob/767ce8a8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 5e8a5dc..e748115 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -2208,8 +2208,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso MetaDataResponse response = processRemoteRegionMutations( PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, - childLinkMutations, fullTableName, - MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK); + childLinkMutations, MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK); if (response != null) { done.run(response); return; @@ -2229,8 +2228,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso response = processRemoteRegionMutations( PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, - remoteMutations, fullTableName, - MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE); + remoteMutations, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE); + clearParentTableFromCache(clientTimeStamp, + parentTable.getSchemaName() != null + ? parentTable.getSchemaName().getBytes() + : ByteUtil.EMPTY_BYTE_ARRAY, + parentTable.getName().getBytes()); if (response != null) { done.run(response); return; @@ -2484,8 +2487,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso MetaDataResponse response = processRemoteRegionMutations( PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, - childLinkMutations, SchemaUtil.getTableName(schemaName, tableName), - MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK); + childLinkMutations, MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK); if (response!=null) { done.run(response); return; @@ -2521,8 +2523,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } private MetaDataResponse processRemoteRegionMutations(byte[] systemTableName, - List<Mutation> remoteMutations, String tableName, - MetaDataProtos.MutationCode mutationCode) throws IOException { + List<Mutation> remoteMutations, MetaDataProtos.MutationCode mutationCode) throws IOException { MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); try (Table hTable = env.getTable( @@ -2780,7 +2781,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso .getEncodingScheme() != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) { processRemoteRegionMutations( PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, remoteMutations, - fullTableName, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE); + MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE); + clearParentTableFromCache(clientTimeStamp, + table.getParentSchemaName() != null + ? table.getParentSchemaName().getBytes() + : ByteUtil.EMPTY_BYTE_ARRAY, + table.getParentTableName().getBytes()); } else { String msg = "Found unexpected mutations while adding or dropping column to "+fullTableName; @@ -2815,6 +2821,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return null; // impossible } } + + /** + * Removes the table from the server side cache + */ + private void clearParentTableFromCache(long clientTimeStamp, byte[] schemaName, byte[] tableName) throws SQLException { + // remove the parent table from the metadata cache as we just mutated the table + Properties props = new Properties(); + if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) { + props.setProperty("CurrentSCN", Long.toString(clientTimeStamp)); + } + try (PhoenixConnection connection = + QueryUtil.getConnectionOnServer(props, env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + ConnectionQueryServices queryServices = connection.getQueryServices(); + queryServices.clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, schemaName, tableName, + clientTimeStamp); + } catch (ClassNotFoundException e) { + } + } private static boolean isDivergedView(PTable view) { return view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT; @@ -3205,28 +3230,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } } - if (type == PTableType.VIEW - && EncodedColumnsUtil.usesEncodedColumnNames(table)) { - /* - * When adding a column to a view that uses encoded column name scheme, we - * need to modify the CQ counters stored in the view's physical table. So to - * make sure clients get the latest PTable, we need to invalidate the cache - * entry. - */ - invalidateList.add(new ImmutableBytesPtr(MetaDataUtil - .getPhysicalTableRowForView(table))); - - - - } + boolean addingCol = false; for (Mutation m : tableMetaData) { byte[] key = m.getRow(); boolean addingPKColumn = false; int pkCount = getVarChars(key, rowKeyMetaData); + // this means we have are adding a column if (pkCount > COLUMN_NAME_INDEX && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) { try { + addingCol = true; if (pkCount > FAMILY_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) { PColumnFamily family = @@ -3291,6 +3305,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } tableMetaData.addAll(additionalTableMetadataMutations); + if (type == PTableType.VIEW + && EncodedColumnsUtil.usesEncodedColumnNames(table) && addingCol + && !table.isAppendOnlySchema()) { + // When adding a column to a view that uses encoded column name + // scheme, we need to modify the CQ counters stored in the view's + // physical table. So to make sure clients get the latest PTable, we + // need to invalidate the cache entry. + // If the table uses APPEND_ONLY_SCHEMA we use the position of the + // column as the encoded column qualifier and so we don't need to + // update the CQ counter in the view physical table (see + // PHOENIX-4737) + invalidateList.add(new ImmutableBytesPtr( + MetaDataUtil.getPhysicalTableRowForView(table))); + } return null; } }, request.getClientVersion()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/767ce8a8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java index 15b0020..059bca1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java @@ -104,7 +104,7 @@ public class PhoenixMetaDataCoprocessorHost /** * Encapsulation of the environment of each coprocessor */ - static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment + public static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment implements RegionCoprocessorEnvironment { private RegionCoprocessorEnvironment env; http://git-wip-us.apache.org/repos/asf/phoenix/blob/767ce8a8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index c714eab..1114463 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -3321,6 +3321,7 @@ public class MetaDataClient { String physicalTableName = SchemaUtil.getTableNameFromFullName(physicalName.getString()); Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3); + boolean acquiredMutex = false; try { connection.setAutoCommit(false); @@ -3599,17 +3600,26 @@ public class MetaDataClient { } } - boolean acquiredMutex = true; - for (PColumn pColumn : columns) { - // acquire the mutex using the global physical table name to - // prevent creating the same column on a table or view with - // a conflicting type etc - acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName, - pColumn.getName().getString()); + if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { + // for tables that use column encoding acquire a mutex on the base table as we + // need to update the encoded column qualifier counter on the base table + acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName, null); if (!acquiredMutex) { throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName); } - acquiredColumnMutexSet.add(pColumn.getName().getString()); + } + else { + for (PColumn pColumn : columns) { + // acquire the mutex using the global physical table name to + // prevent creating the same column on a table or view with + // a conflicting type etc + acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName, + pColumn.getName().getString()); + if (!acquiredMutex) { + throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName); + } + acquiredColumnMutexSet.add(pColumn.getName().getString()); + } } MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table, properties, colFamiliesForPColumnsToBeAdded, columns); try { @@ -3681,7 +3691,12 @@ public class MetaDataClient { } } finally { connection.setAutoCommit(wasAutoCommit); - if (!acquiredColumnMutexSet.isEmpty()) { + if (EncodedColumnsUtil.usesEncodedColumnNames(table) && acquiredMutex) { + // release the mutex on the physical table (used to prevent concurrent conflicting + // add column changes) + deleteCell(null, physicalSchemaName, physicalTableName, null); + } + else if (!acquiredColumnMutexSet.isEmpty()) { for (String columnName : acquiredColumnMutexSet) { // release the mutex (used to prevent concurrent conflicting add column changes) deleteCell(null, physicalSchemaName, physicalTableName, columnName);