Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 61ea48c16 -> 18a47b051
PHOENIX-3002 Upgrading to 4.8 doesn't recreate local indexes(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18a47b05 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18a47b05 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18a47b05 Branch: refs/heads/4.x-HBase-1.1 Commit: 18a47b05146755a93b1e49ab9b22f49a4229439b Parents: 61ea48c Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Fri Jun 24 02:27:57 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Fri Jun 24 02:27:57 2016 +0530 ---------------------------------------------------------------------- .../query/ConnectionQueryServicesImpl.java | 199 +---------------- .../org/apache/phoenix/query/QueryServices.java | 2 +- .../phoenix/query/QueryServicesOptions.java | 6 +- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 11 +- .../org/apache/phoenix/util/UpgradeUtil.java | 219 +++++++++++++++++-- 6 files changed, 224 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 5c17eb0..0a5c4f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -21,17 +21,7 @@ import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; @@ -94,6 +84,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; +import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -190,6 +181,7 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixContextExecutor; @@ -2472,19 +2464,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { - Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); - props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); - props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); - PhoenixConnection conn = - new PhoenixConnection(ConnectionQueryServicesImpl.this, - metaConnection.getURL(), props, metaConnection - .getMetaDataCache()); - try { - UpgradeUtil.upgradeLocalIndexes(conn, true); - } finally { - if (conn != null) conn.close(); - } - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, @@ -2500,7 +2479,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = disableViewIndexes(metaConnection); + if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, + QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { + metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); + } + metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); @@ -2722,176 +2705,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return metaConnection; } - - private PhoenixConnection disableViewIndexes(PhoenixConnection connParam) throws SQLException, IOException, InterruptedException, TimeoutException { - Properties props = PropertiesUtil.deepCopy(connParam.getClientInfo()); - Long originalScn = null; - String str = props.getProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB); - if (str != null) { - originalScn = Long.valueOf(str); - } - // don't use the passed timestamp as scn because we want to query all view indexes up to now. - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP)); - Set<String> physicalTables = new HashSet<>(); - SQLException sqlEx = null; - PhoenixConnection globalConnection = null; - PhoenixConnection toReturn = null; - try { - globalConnection = new PhoenixConnection(connParam, this, props); - String tenantId = null; - try (HBaseAdmin admin = getAdmin()) { - String fetchViewIndexes = "SELECT " + TENANT_ID + ", " + TABLE_SCHEM + ", " + TABLE_NAME + - ", " + DATA_TABLE_NAME + " FROM " + SYSTEM_CATALOG_NAME + " WHERE " + VIEW_INDEX_ID - + " IS NOT NULL AND " + INDEX_TYPE + "<>" + IndexType.LOCAL.getSerializedValue(); - String disableIndexDDL = "ALTER INDEX %s ON %s DISABLE"; - try (ResultSet rs = globalConnection.createStatement().executeQuery(fetchViewIndexes)) { - while (rs.next()) { - tenantId = rs.getString(1); - String indexSchema = rs.getString(2); - String indexName = rs.getString(3); - String viewName = rs.getString(4); - String fullIndexName = SchemaUtil.getTableName(indexSchema, indexName); - PTable viewPTable = null; - // Disable the view index and truncate the underlying hbase table. - // Users would need to rebuild the view indexes. - if (tenantId != null && !tenantId.isEmpty()) { - Properties newProps = PropertiesUtil.deepCopy(globalConnection.getClientInfo()); - newProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); - PTable indexPTable = null; - try (PhoenixConnection tenantConnection = new PhoenixConnection(globalConnection, this, newProps)) { - viewPTable = PhoenixRuntime.getTable(tenantConnection, viewName); - tenantConnection.createStatement().execute(String.format(disableIndexDDL, fullIndexName, viewName)); - indexPTable = PhoenixRuntime.getTable(tenantConnection, fullIndexName); - } - - int offset = indexPTable.getBucketNum() != null ? 1 : 0; - int existingTenantIdPosition = ++offset; // positions are stored 1 based - int existingViewIdxIdPosition = ++offset; - int newTenantIdPosition = existingViewIdxIdPosition; - int newViewIdxPosition = existingTenantIdPosition; - String tenantIdColumn = indexPTable.getColumns().get(existingTenantIdPosition - 1).getName().getString(); - int index = 0; - String updatePosition = - "UPSERT INTO " - + SYSTEM_CATALOG_NAME - + " ( " - + TENANT_ID - + "," - + TABLE_SCHEM - + "," - + TABLE_NAME - + "," - + COLUMN_NAME - + "," - + COLUMN_FAMILY - + "," - + ORDINAL_POSITION - + ") SELECT " - + TENANT_ID - + "," - + TABLE_SCHEM - + "," - + TABLE_NAME - + "," - + COLUMN_NAME - + "," - + COLUMN_FAMILY - + "," - + "?" - + " FROM " - + SYSTEM_CATALOG_NAME - + " WHERE " - + TENANT_ID - + " = ? " - + " AND " - + TABLE_NAME - + " = ? " - + " AND " - + (indexSchema == null ? TABLE_SCHEM + " IS NULL" : TABLE_SCHEM + " = ? ") - + " AND " - + COLUMN_NAME - + " = ? "; - // update view index position - try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) { - index = 0; - s.setInt(++index, newViewIdxPosition); - s.setString(++index, tenantId); - s.setString(++index, indexName); - if (indexSchema != null) { - s.setString(++index, indexSchema); - } - s.setString(++index, MetaDataUtil.getViewIndexIdColumnName()); - s.executeUpdate(); - } - // update tenant id position - try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) { - index = 0; - s.setInt(++index, newTenantIdPosition); - s.setString(++index, tenantId); - s.setString(++index, indexName); - if (indexSchema != null) { - s.setString(++index, indexSchema); - } - s.setString(++index, tenantIdColumn); - s.executeUpdate(); - } - globalConnection.commit(); - } else { - viewPTable = PhoenixRuntime.getTable(globalConnection, viewName); - globalConnection.createStatement().execute(String.format(disableIndexDDL, fullIndexName, viewName)); - } - String indexPhysicalTableName = MetaDataUtil.getViewIndexTableName(viewPTable.getPhysicalName().getString()); - if (physicalTables.add(indexPhysicalTableName)) { - final TableName tableName = TableName.valueOf(indexPhysicalTableName); - admin.disableTableAsync(tableName); - checkAndRetry(new RetriableOperation() { - @Override - public boolean checkForCompletion() throws TimeoutException, - IOException { - return admin.isTableDisabled(tableName); - } - @Override - public String getOperationName() { - return "Disable table: " + tableName.getNameAsString(); - } - }); - admin.truncateTable(tableName, false); - } - } - } - } - if (originalScn != null) { - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(originalScn)); - } - toReturn = new PhoenixConnection(globalConnection, this, props); - } catch (SQLException e) { - sqlEx = e; - } finally { - sqlEx = closeConnection(connParam, sqlEx); - sqlEx = closeConnection(globalConnection, sqlEx); - if (sqlEx != null) { - throw sqlEx; - } - } - return toReturn; - } - - - private static SQLException closeConnection(PhoenixConnection conn, SQLException sqlEx) { - SQLException toReturn = sqlEx; - try { - conn.close(); - } catch (SQLException e) { - if (toReturn != null) { - toReturn.setNextException(e); - } else { - toReturn = e; - } - } - return toReturn; - } /** * Forces update of SYSTEM.CATALOG by setting column to existing value http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1917893..f5e2a0a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -217,7 +217,7 @@ public interface QueryServices extends SQLCloseable { // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index) public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time"; - + public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0e7dce9..cb646a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -42,6 +42,7 @@ import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLED; import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; @@ -246,6 +247,7 @@ public class QueryServicesOptions { public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS = (3 * DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) / 4; public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10; + public static final boolean DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE = true; @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { @@ -322,8 +324,8 @@ public class QueryServicesOptions { .setIfUnset(RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS, DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS) .setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE) .setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED) - .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE); - + .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE) + .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index dce9a69..5335fd2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2628,7 +2628,7 @@ public class MetaDataClient { private void deleteFromStatsTable(List<TableRef> tableRefs, long ts) throws SQLException { Properties props = new Properties(connection.getClientInfo()); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); - Connection conn = DriverManager.getConnection(connection.getURL(), props); + Connection conn = new PhoenixConnection(connection.getQueryServices(), connection, ts); conn.setAutoCommit(true); boolean success = false; SQLException sqlException = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 184288e..fba7e06 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -211,6 +211,9 @@ public class PhoenixRuntime { PhoenixConnection conn = null; try { Properties props = new Properties(); + if (execCmd.isLocalIndexUpgrade()) { + props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false"); + } conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class); if (execCmd.isMapNamespace()) { String srcTable = execCmd.getSrcTable(); @@ -245,6 +248,8 @@ public class PhoenixRuntime { } else { UpgradeUtil.upgradeDescVarLengthRowKeys(conn, execCmd.getInputFiles(), execCmd.isBypassUpgrade()); } + } else if(execCmd.isLocalIndexUpgrade()) { + UpgradeUtil.upgradeLocalIndexes(conn); } else { for (String inputFile : execCmd.getInputFiles()) { if (inputFile.endsWith(SQL_FILE_EXT)) { @@ -640,7 +645,9 @@ public class PhoenixRuntime { } execCmd.isBypassUpgrade = true; } - execCmd.localIndexUpgrade = cmdLine.hasOption(localIndexUpgradeOption.getOpt()); + if(cmdLine.hasOption(localIndexUpgradeOption.getOpt())) { + execCmd.localIndexUpgrade = true; + } List<String> argList = Lists.newArrayList(cmdLine.getArgList()); if (argList.isEmpty()) { @@ -656,7 +663,7 @@ public class PhoenixRuntime { } } - if (inputFiles.isEmpty() && !execCmd.isUpgrade && !execCmd.isMapNamespace()) { + if (inputFiles.isEmpty() && !execCmd.isUpgrade && !execCmd.isMapNamespace() && !execCmd.isLocalIndexUpgrade()) { usageError("At least one input file must be supplied", options); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18a47b05/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 3c07d95..f558c72 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -25,9 +25,11 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; @@ -42,6 +44,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT; @@ -54,9 +57,11 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeoutException; import java.util.Properties; import java.util.Set; @@ -303,12 +308,21 @@ public class UpgradeUtil { } } - public static void upgradeLocalIndexes(PhoenixConnection metaConnection, boolean createAsyncIndex) throws SQLException, - IOException, org.apache.hadoop.hbase.TableNotFoundException { - HBaseAdmin admin = null; - try { - admin = metaConnection.getQueryServices().getAdmin(); - ResultSet rs = metaConnection.createStatement().executeQuery("SELECT TABLE_SCHEM, TABLE_NAME, DATA_TABLE_NAME FROM SYSTEM.CATALOG " + public static PhoenixConnection upgradeLocalIndexes(PhoenixConnection metaConnection) + throws SQLException, IOException, org.apache.hadoop.hbase.TableNotFoundException { + Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + Long originalScn = null; + String str = props.getProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB); + if (str != null) { + originalScn = Long.valueOf(str); + } + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP)); + PhoenixConnection globalConnection = null; + PhoenixConnection toReturn = null; + globalConnection = new PhoenixConnection(metaConnection, metaConnection.getQueryServices(), props); + SQLException sqlEx = null; + try (HBaseAdmin admin = globalConnection.getQueryServices().getAdmin()) { + ResultSet rs = globalConnection.createStatement().executeQuery("SELECT TABLE_SCHEM, TABLE_NAME, DATA_TABLE_NAME FROM SYSTEM.CATALOG " + " WHERE COLUMN_NAME IS NULL" + " AND COLUMN_FAMILY IS NULL" + " AND INDEX_TYPE=2"); @@ -353,28 +367,31 @@ public class UpgradeUtil { + (rs.getString(1) == null ? "IS NULL " : "='" + rs.getString(1) + "'") + " and TABLE_NAME='" + rs.getString(2) + "' AND COLUMN_NAME IS NOT NULL"; - ResultSet getColumnsRs = metaConnection.createStatement().executeQuery(getColumns); + ResultSet getColumnsRs = globalConnection.createStatement().executeQuery(getColumns); List<String> indexedColumns = new ArrayList<String>(1); List<String> coveredColumns = new ArrayList<String>(1); while (getColumnsRs.next()) { String column = getColumnsRs.getString(1); String columnName = IndexUtil.getDataColumnName(column); - if (columnName.equals(MetaDataUtil.getViewIndexIdColumnName())) { + if (SchemaUtil.normalizeIdentifier(columnName).equals(MetaDataUtil.getViewIndexIdColumnName())) { continue; } String columnFamily = IndexUtil.getDataColumnFamilyName(column); if (getColumnsRs.getString(2) == null) { if (columnFamily != null && !columnFamily.isEmpty()) { - if (columnFamily.equals(QueryConstants.DEFAULT_COLUMN_FAMILY)) { + if (SchemaUtil.normalizeIdentifier(columnFamily).equals(QueryConstants.DEFAULT_COLUMN_FAMILY)) { indexedColumns.add(columnName); } else { - indexedColumns.add(SchemaUtil.getColumnName(columnFamily, - columnName)); + indexedColumns.add(SchemaUtil.getCaseSensitiveColumnDisplayName( + columnFamily, columnName)); } } } else { - coveredColumns.add(SchemaUtil.getColumnName(columnFamily, columnName)); + coveredColumns.add(SchemaUtil.normalizeIdentifier(columnFamily) + .equals(QueryConstants.DEFAULT_COLUMN_FAMILY) ? columnName + : SchemaUtil.getCaseSensitiveColumnDisplayName( + columnFamily, columnName)); } } StringBuilder createIndex = new StringBuilder("CREATE LOCAL INDEX "); @@ -398,24 +415,192 @@ public class UpgradeUtil { createIndex.append(","); } } - createIndex.append(") ASYNC"); + createIndex.append(")"); } + createIndex.append(" ASYNC"); logger.info("Index creation query is : " + createIndex.toString()); logger.info("Dropping the index " + rs.getString(2) + " to clean up the index details from SYSTEM.CATALOG."); - metaConnection.createStatement().execute( + globalConnection.createStatement().execute( "DROP INDEX IF EXISTS " + rs.getString(2) + " ON " + SchemaUtil.getTableName(rs.getString(1), rs.getString(3))); logger.info("Recreating the index " + rs.getString(2)); - metaConnection.createStatement().execute(createIndex.toString()); + globalConnection.createStatement().execute(createIndex.toString()); logger.info("Created the index " + rs.getString(2)); } - metaConnection.createStatement().execute("DELETE FROM SYSTEM.CATALOG WHERE SUBSTR(TABLE_NAME,0,11)='"+MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+"'"); + globalConnection.createStatement().execute("DELETE FROM SYSTEM.CATALOG WHERE SUBSTR(TABLE_NAME,0,11)='"+MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+"'"); + if (originalScn != null) { + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(originalScn)); + } + toReturn = new PhoenixConnection(globalConnection, metaConnection.getQueryServices(), props); + } catch (SQLException e) { + sqlEx = e; } finally { - if (admin != null) admin.close(); + sqlEx = closeConnection(metaConnection, sqlEx); + sqlEx = closeConnection(globalConnection, sqlEx); + if (sqlEx != null) { + throw sqlEx; + } } + return toReturn; + } + + public static PhoenixConnection disableViewIndexes(PhoenixConnection connParam) throws SQLException, IOException, InterruptedException, TimeoutException { + Properties props = PropertiesUtil.deepCopy(connParam.getClientInfo()); + Long originalScn = null; + String str = props.getProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB); + if (str != null) { + originalScn = Long.valueOf(str); + } + // don't use the passed timestamp as scn because we want to query all view indexes up to now. + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP)); + Set<String> physicalTables = new HashSet<>(); + SQLException sqlEx = null; + PhoenixConnection globalConnection = null; + PhoenixConnection toReturn = null; + try { + globalConnection = new PhoenixConnection(connParam, connParam.getQueryServices(), props); + String tenantId = null; + try (HBaseAdmin admin = globalConnection.getQueryServices().getAdmin()) { + String fetchViewIndexes = "SELECT " + TENANT_ID + ", " + TABLE_SCHEM + ", " + TABLE_NAME + + ", " + DATA_TABLE_NAME + " FROM " + SYSTEM_CATALOG_NAME + " WHERE " + VIEW_INDEX_ID + + " IS NOT NULL AND " + INDEX_TYPE + "<>" + IndexType.LOCAL.getSerializedValue(); + String disableIndexDDL = "ALTER INDEX %s ON %s DISABLE"; + try (ResultSet rs = globalConnection.createStatement().executeQuery(fetchViewIndexes)) { + while (rs.next()) { + tenantId = rs.getString(1); + String indexSchema = rs.getString(2); + String indexName = rs.getString(3); + String viewName = rs.getString(4); + String fullIndexName = SchemaUtil.getTableName(indexSchema, indexName); + PTable viewPTable = null; + // Disable the view index and truncate the underlying hbase table. + // Users would need to rebuild the view indexes. + if (tenantId != null && !tenantId.isEmpty()) { + Properties newProps = PropertiesUtil.deepCopy(globalConnection.getClientInfo()); + newProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + PTable indexPTable = null; + try (PhoenixConnection tenantConnection = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), newProps)) { + viewPTable = PhoenixRuntime.getTable(tenantConnection, viewName); + tenantConnection.createStatement().execute(String.format(disableIndexDDL, fullIndexName, viewName)); + indexPTable = PhoenixRuntime.getTable(tenantConnection, fullIndexName); + } + + int offset = indexPTable.getBucketNum() != null ? 1 : 0; + int existingTenantIdPosition = ++offset; // positions are stored 1 based + int existingViewIdxIdPosition = ++offset; + int newTenantIdPosition = existingViewIdxIdPosition; + int newViewIdxPosition = existingTenantIdPosition; + String tenantIdColumn = indexPTable.getColumns().get(existingTenantIdPosition - 1).getName().getString(); + int index = 0; + String updatePosition = + "UPSERT INTO " + + SYSTEM_CATALOG_NAME + + " ( " + + TENANT_ID + + "," + + TABLE_SCHEM + + "," + + TABLE_NAME + + "," + + COLUMN_NAME + + "," + + COLUMN_FAMILY + + "," + + ORDINAL_POSITION + + ") SELECT " + + TENANT_ID + + "," + + TABLE_SCHEM + + "," + + TABLE_NAME + + "," + + COLUMN_NAME + + "," + + COLUMN_FAMILY + + "," + + "?" + + " FROM " + + SYSTEM_CATALOG_NAME + + " WHERE " + + TENANT_ID + + " = ? " + + " AND " + + TABLE_NAME + + " = ? " + + " AND " + + (indexSchema == null ? TABLE_SCHEM + " IS NULL" : TABLE_SCHEM + " = ? ") + + " AND " + + COLUMN_NAME + + " = ? "; + // update view index position + try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) { + index = 0; + s.setInt(++index, newViewIdxPosition); + s.setString(++index, tenantId); + s.setString(++index, indexName); + if (indexSchema != null) { + s.setString(++index, indexSchema); + } + s.setString(++index, MetaDataUtil.getViewIndexIdColumnName()); + s.executeUpdate(); + } + // update tenant id position + try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) { + index = 0; + s.setInt(++index, newTenantIdPosition); + s.setString(++index, tenantId); + s.setString(++index, indexName); + if (indexSchema != null) { + s.setString(++index, indexSchema); + } + s.setString(++index, tenantIdColumn); + s.executeUpdate(); + } + globalConnection.commit(); + } else { + viewPTable = PhoenixRuntime.getTable(globalConnection, viewName); + globalConnection.createStatement().execute(String.format(disableIndexDDL, fullIndexName, viewName)); + } + String indexPhysicalTableName = MetaDataUtil.getViewIndexTableName(viewPTable.getPhysicalName().getString()); + if (physicalTables.add(indexPhysicalTableName)) { + final TableName tableName = TableName.valueOf(indexPhysicalTableName); + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + } + } + } + } + if (originalScn != null) { + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(originalScn)); + } + toReturn = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), props); + } catch (SQLException e) { + sqlEx = e; + } finally { + sqlEx = closeConnection(connParam, sqlEx); + sqlEx = closeConnection(globalConnection, sqlEx); + if (sqlEx != null) { + throw sqlEx; + } + } + return toReturn; } + + public static SQLException closeConnection(PhoenixConnection conn, SQLException sqlEx) { + SQLException toReturn = sqlEx; + try { + conn.close(); + } catch (SQLException e) { + if (toReturn != null) { + toReturn.setNextException(e); + } else { + toReturn = e; + } + } + return toReturn; + } @SuppressWarnings("deprecation") public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException { logger.info("Upgrading SYSTEM.SEQUENCE table");
