This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 57058dd055 PHOENIX-6953 Creating indexes on a table with old indexing leads to inconsistent co-processors 57058dd055 is described below commit 57058dd055d6d783cacfbaf2acf413ebc315edbf Author: Istvan Toth <st...@apache.org> AuthorDate: Thu May 11 15:58:26 2023 +0200 PHOENIX-6953 Creating indexes on a table with old indexing leads to inconsistent co-processors --- .../org/apache/phoenix/end2end/CreateTableIT.java | 153 ++++++++++++++++++++- .../phoenix/query/ConnectionQueryServicesImpl.java | 29 ++-- 2 files changed, 170 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index df46118165..f1b3a98fcf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,20 +37,22 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Properties; +import java.util.UUID; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; -import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.index.IndexUpgradeTool; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; @@ -71,6 +74,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -1518,6 +1522,151 @@ public class CreateTableIT extends ParallelStatsDisabledIT { .getColumnQualifierBytes())); } + // Test for PHOENIX-6953 + @Test + public void testCoprocessorsForCreateIndexOnOldImplementation() throws Exception { + String tableName = generateUniqueName(); + String index1Name = generateUniqueName(); + String index2Name = generateUniqueName(); + + String ddl = + "create table " + tableName + " ( k integer PRIMARY KEY," + " v1 integer," + + " v2 integer)"; + String index1Ddl = "create index " + index1Name + " on " + tableName + " (v1)"; + String index2Ddl = "create index " + index2Name + " on " + tableName + " (v2)"; + + Properties props = new Properties(); + Admin admin = driver.getConnectionQueryServices(getUrl(), props).getAdmin(); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement();) { + stmt.execute(ddl); + stmt.execute(index1Ddl); + + TableDescriptor index1DescriptorBefore = + admin.getDescriptor(TableName.valueOf(index1Name)); + assertTrue(index1DescriptorBefore + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + + // Now roll back to the old indexing + IndexUpgradeTool iut = + new IndexUpgradeTool(ROLLBACK_OP, tableName, null, + "/tmp/index_upgrade_" + UUID.randomUUID().toString(), false, null, + false); + iut.setConf(getUtility().getConfiguration()); + iut.prepareToolSetup(); + assertEquals(0, iut.executeTool()); + + TableDescriptor index1DescriptorAfter = + admin.getDescriptor(TableName.valueOf(index1Name)); + assertFalse(index1DescriptorAfter + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + + // New index must not have GlobalIndexChecker + stmt.execute(index2Ddl); + TableDescriptor index2Descriptor = admin.getDescriptor(TableName.valueOf(index2Name)); + assertFalse(index2Descriptor + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + } + } + + // Test for PHOENIX-6953 + @Test + public void testCoprocessorsForTransactionalCreateIndexOnOldImplementation() throws Exception { + String tableName = generateUniqueName(); + String index1Name = generateUniqueName(); + + String ddl = + "create table " + tableName + " ( k integer PRIMARY KEY," + " v1 integer," + + " v2 integer) TRANSACTIONAL=TRUE"; + String index1Ddl = "create index " + index1Name + " on " + tableName + " (v1)"; + + Properties props = new Properties(); + Admin admin = driver.getConnectionQueryServices(getUrl(), props).getAdmin(); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement();) { + stmt.execute(ddl); + stmt.execute(index1Ddl); + + // Transactional indexes don't have GlobalIndexChecker + TableDescriptor index1DescriptorBefore = + admin.getDescriptor(TableName.valueOf(index1Name)); + assertFalse(index1DescriptorBefore + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + + // Now roll back to the old indexing + IndexUpgradeTool iut = + new IndexUpgradeTool(ROLLBACK_OP, tableName, null, + "/tmp/index_upgrade_" + UUID.randomUUID().toString(), false, null, + false); + iut.setConf(getUtility().getConfiguration()); + iut.prepareToolSetup(); + assertEquals(0, iut.executeTool()); + + // Make sure we don't add GlobalIndexChecker + TableDescriptor index1DescriptorAfter = + admin.getDescriptor(TableName.valueOf(index1Name)); + assertFalse(index1DescriptorAfter + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + + // We should also test for setting / unsetting the transactional status, but both are + // forbidden at the time of writing. + } + } + + @Test + @Ignore // This would only work Tephra, and we can't run Tephra tests now + public void testCoprocessorsWhenAddingTransactionaFlag() throws Exception { + String tableName = generateUniqueName(); + String index1Name = generateUniqueName(); + + String ddl = + "create table " + tableName + " ( k integer PRIMARY KEY," + " v1 integer," + + " v2 integer)"; + String dropDdl = "drop table " + tableName; + String index1Ddl = "create index " + index1Name + " on " + tableName + " (v1)"; + String setTransactional = + "alter table " + tableName + + " SET TRANSACTIONAL=true, TRANSACTION_PROVIDER='TEPHRA' "; + + Properties props = new Properties(); + Admin admin = driver.getConnectionQueryServices(getUrl(), props).getAdmin(); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement();) { + stmt.execute(ddl); + stmt.execute(index1Ddl); + + stmt.executeUpdate(setTransactional); + + // Transactional indexes don't have GlobalIndexChecker + TableDescriptor index1Descriptor = + admin.getDescriptor(TableName.valueOf(index1Name)); + assertFalse(index1Descriptor + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + + //The same for the old indexing + stmt.execute(dropDdl); + stmt.execute(ddl); + stmt.execute(index1Ddl); + + // Now roll back to the old indexing + IndexUpgradeTool iut = + new IndexUpgradeTool(ROLLBACK_OP, tableName, null, + "/tmp/index_upgrade_" + UUID.randomUUID().toString(), false, null, + false); + iut.setConf(getUtility().getConfiguration()); + iut.prepareToolSetup(); + assertEquals(0, iut.executeTool()); + + stmt.executeUpdate(setTransactional); + //make sure we not add GlobalIndexChecker + assertFalse(index1Descriptor + .hasCoprocessor(org.apache.phoenix.index.GlobalIndexChecker.class.getName())); + } + } + public static long verifyLastDDLTimestamp(String dataTableFullName, long startTS, Connection conn) throws SQLException { long endTS = EnvironmentEdgeManager.currentTimeMillis(); //Now try the PTable API diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 86ed6713ee..c0fa6d8bc5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -895,6 +895,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement : TableDescriptorBuilder.newBuilder(TableName.valueOf(physicalTableName)); ColumnFamilyDescriptor dataTableColDescForIndexTablePropSyncing = null; + boolean doNotAddGlobalIndexChecker = false; if (tableType == PTableType.INDEX || MetaDataUtil.isViewIndex(Bytes.toString(physicalTableName))) { byte[] defaultFamilyBytes = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName); @@ -920,6 +921,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (dataTableColDescForIndexTablePropSyncing == null) { dataTableColDescForIndexTablePropSyncing = baseTableDesc.getColumnFamilies()[0]; } + if (baseTableDesc.hasCoprocessor(org.apache.phoenix.hbase.index.Indexer.class.getName())) { + // The base table still uses the old indexing + doNotAddGlobalIndexChecker = true; + } } // By default, do not automatically rebuild/catch up an index on a write failure // Add table-specific properties to the table descriptor @@ -968,7 +973,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } addCoprocessors(physicalTableName, tableDescriptorBuilder, - tableType, tableProps, existingDesc); + tableType, tableProps, existingDesc, doNotAddGlobalIndexChecker); // PHOENIX-3072: Set index priority if this is a system table or index table if (tableType == PTableType.SYSTEM) { @@ -995,8 +1000,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder, - PTableType tableType, Map<String,Object> tableProps, - TableDescriptor existingDesc) throws SQLException { + PTableType tableType, Map<String, Object> tableProps, TableDescriptor existingDesc, + boolean doNotAddGlobalIndexChecker) throws SQLException { // The phoenix jar must be available on HBase classpath int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); try { @@ -1027,7 +1032,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) { builder.removeCoprocessor(IndexRegionObserver.class.getName()); } - builder.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null); + if (!doNotAddGlobalIndexChecker) { + builder.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null); + } } } @@ -2429,7 +2436,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement Map<TableDescriptor, TableDescriptor> oldToNewTableDescriptors) throws SQLException { byte[] physicalTableName = table.getPhysicalName().getBytes(); try (Admin admin = getAdmin()) { - setTransactional(physicalTableName, tableDescriptorBuilder, table.getType(), txValue, tableProps); + TableDescriptor baseDesc = admin.getDescriptor(TableName.valueOf(physicalTableName)); + boolean hasOldIndexing = baseDesc.hasCoprocessor(org.apache.phoenix.hbase.index.Indexer.class.getName()); + setTransactional(physicalTableName, tableDescriptorBuilder, table.getType(), txValue, tableProps, hasOldIndexing); Map<String, Object> indexTableProps; if (txValue == null) { indexTableProps = Collections.emptyMap(); @@ -2475,7 +2484,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement indexDescriptorBuilder.setColumnFamily(indexColDescriptor.build()); } } - setTransactional(index.getPhysicalName().getBytes(), indexDescriptorBuilder, index.getType(), txValue, indexTableProps); + setTransactional(index.getPhysicalName().getBytes(), indexDescriptorBuilder, index.getType(), txValue, indexTableProps, hasOldIndexing); descriptorsToUpdate.add(indexDescriptorBuilder.build()); } try { @@ -2489,7 +2498,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } TableDescriptorBuilder indexDescriptorBuilder = TableDescriptorBuilder.newBuilder(intermedIndexDesc); setSharedIndexMaxVersion(table, tableDescriptorBuilder.build(), indexDescriptorBuilder); - setTransactional(MetaDataUtil.getViewIndexPhysicalName(physicalTableName), indexDescriptorBuilder, PTableType.INDEX, txValue, indexTableProps); + setTransactional(MetaDataUtil.getViewIndexPhysicalName(physicalTableName), indexDescriptorBuilder, PTableType.INDEX, txValue, indexTableProps, hasOldIndexing); descriptorsToUpdate.add(indexDescriptorBuilder.build()); } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { // Ignore, as we may never have created a view index table @@ -2505,7 +2514,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } TableDescriptorBuilder indexDescriptorBuilder = TableDescriptorBuilder.newBuilder(intermedIndexDesc); setSharedIndexMaxVersion(table, tableDescriptorBuilder.build(), indexDescriptorBuilder); - setTransactional(MetaDataUtil.getViewIndexPhysicalName(physicalTableName), indexDescriptorBuilder, PTableType.INDEX, txValue, indexTableProps); + setTransactional(MetaDataUtil.getViewIndexPhysicalName(physicalTableName), indexDescriptorBuilder, PTableType.INDEX, txValue, indexTableProps, hasOldIndexing); descriptorsToUpdate.add(indexDescriptorBuilder.build()); } catch (org.apache.hadoop.hbase.TableNotFoundException ignore) { // Ignore, as we may never have created a local index @@ -2561,13 +2570,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } - private void setTransactional(byte[] physicalTableName, TableDescriptorBuilder tableDescriptorBuilder, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException { + private void setTransactional(byte[] physicalTableName, TableDescriptorBuilder tableDescriptorBuilder, PTableType tableType, String txValue, Map<String, Object> tableProps, boolean hasOldIndexing) throws SQLException { if (txValue == null) { tableDescriptorBuilder.removeValue(Bytes.toBytes(PhoenixTransactionContext.READ_NON_TX_DATA)); } else { tableDescriptorBuilder.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue); } - this.addCoprocessors(physicalTableName, tableDescriptorBuilder, tableType, tableProps, null); + this.addCoprocessors(physicalTableName, tableDescriptorBuilder, tableType, tableProps, null, hasOldIndexing); } private Map<TableDescriptor, TableDescriptor> separateAndValidateProperties(PTable table,