Repository: phoenix Updated Branches: refs/heads/master 18da4a046 -> 10909ae50
http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 34cedce..75b094b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1114,7 +1114,7 @@ public class MetaDataClient { // connection so that our new index table is visible. Properties props = new Properties(connection.getClientInfo()); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(connection.getSCN()+1)); - PhoenixConnection conn = DriverManager.getConnection(connection.getURL(), props).unwrap(PhoenixConnection.class); + PhoenixConnection conn = new PhoenixConnection(connection, connection.getQueryServices(), props); MetaDataClient newClientAtNextTimeStamp = new MetaDataClient(conn); // Re-resolve the tableRef from the now newer connection @@ -1419,7 +1419,7 @@ public class MetaDataClient { } if (!SchemaUtil.isPKColumn(col) && col.getViewConstant() == null) { // Need to re-create ColumnName, since the above one won't have the column family name - colName = ColumnName.caseSensitiveColumnName(col.getFamilyName().getString(), IndexUtil.getIndexColumnName(col)); + colName = ColumnName.caseSensitiveColumnName(isLocalIndex?IndexUtil.getLocalIndexColumnFamily(col.getFamilyName().getString()):col.getFamilyName().getString(), IndexUtil.getIndexColumnName(col)); columnDefs.add(FACTORY.columnDef(colName, col.getDataType().getSqlTypeName(), col.isNullable(), col.getMaxLength(), col.getScale(), false, col.getSortOrder(), null, col.isRowTimestamp())); } } @@ -1677,6 +1677,7 @@ public class MetaDataClient { boolean isNamespaceMapped = parent == null ? SchemaUtil.isNamespaceMappingEnabled(tableType, connection.getQueryServices().getProps()) : parent.isNamespaceMapped(); + boolean isLocalIndex = indexType == IndexType.LOCAL; if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactional); storeNulls = parent.getStoreNulls(); @@ -1686,17 +1687,21 @@ public class MetaDataClient { // TODO: Can we support a multi-tenant index directly on a multi-tenant // table instead of only a view? We don't have anywhere to put the link // from the table to the index, though. - if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) { + if (isLocalIndex || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) { PName physicalName = parent.getPhysicalName(); saltBucketNum = parent.getBucketNum(); - addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL); + addSaltColumn = (saltBucketNum != null && !isLocalIndex); defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString(); - if (indexType == IndexType.LOCAL) { + if (isLocalIndex) { + defaultFamilyName = + parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY + : IndexUtil.getLocalIndexColumnFamily(parent.getDefaultFamilyName().getString()); saltBucketNum = null; // Set physical name of local index table - physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes()))); + physicalNames = Collections.singletonList(PNameFactory.newName(physicalName.getBytes())); } else { + defaultFamilyName = parent.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : parent.getDefaultFamilyName().getString(); // Set physical name of view index table physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes()))); } @@ -2050,7 +2055,9 @@ public class MetaDataClient { .build().buildException(); } if (column.getFamilyName() != null) { - familyNames.put(column.getFamilyName().getString(),column.getFamilyName()); + familyNames.put( + IndexUtil.getActualColumnFamilyName(column.getFamilyName().getString()), + column.getFamilyName()); } } // We need a PK definition for a TABLE or mapped VIEW @@ -2100,7 +2107,9 @@ public class MetaDataClient { throwIfInsufficientColumns(schemaName, tableName, pkColumns, saltBucketNum!=null, multiTenant); for (PName familyName : familyNames.values()) { - Collection<Pair<String,Object>> props = statement.getProps().get(familyName.getString()); + String fam = familyName.getString(); + Collection<Pair<String, Object>> props = + statement.getProps().get(IndexUtil.getActualColumnFamilyName(fam)); if (props.isEmpty()) { familyPropList.add(new Pair<byte[],Map<String,Object>>(familyName.getBytes(),commonFamilyProps)); } else { @@ -2121,7 +2130,10 @@ public class MetaDataClient { if (familyNames.isEmpty()) { //if there are no family names, use the default column family name. This also takes care of the case when //the table ddl has only PK cols present (which means familyNames is empty). - byte[] cf = defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : Bytes.toBytes(defaultFamilyName); + byte[] cf = + defaultFamilyName == null ? (!isLocalIndex? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES + : QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES) + : Bytes.toBytes(defaultFamilyName); familyPropList.add(new Pair<byte[],Map<String,Object>>(cf, commonFamilyProps)); } @@ -2297,11 +2309,7 @@ public class MetaDataClient { */ Collections.reverse(tableMetaData); - if (parent != null && tableType == PTableType.INDEX && indexType == IndexType.LOCAL) { - tableProps.put(MetaDataUtil.PARENT_TABLE_KEY, parent.getPhysicalName().getString()); - tableProps.put(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME, Boolean.TRUE); - splits = getSplitKeys(connection.getQueryServices().getAllTableRegions(parent.getPhysicalName().getBytes())); - } else { + if (indexType != IndexType.LOCAL) { splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean( QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER)); } @@ -2566,7 +2574,7 @@ public class MetaDataClient { // All multi-tenant tables have a view index table, so no need to check in that case if (parentTableName == null) { for (PTable index : table.getIndexes()) { - if (MetaDataUtil.isLocalIndex(index.getPhysicalName().getString())) { + if (index.getIndexType() == IndexType.LOCAL) { hasLocalIndexTable = true; } } @@ -2590,19 +2598,6 @@ public class MetaDataClient { table.getColumnFamilies()); tableRefs.add(new TableRef(null, viewIndexTable, ts, false)); } - if (hasLocalIndexTable) { - String localIndexSchemaName = null; - String localIndexTableName = null; - if (schemaName != null) { - localIndexSchemaName = MetaDataUtil.getLocalIndexTableName(schemaName); - localIndexTableName = tableName; - } else { - localIndexTableName = MetaDataUtil.getLocalIndexTableName(tableName); - } - PTable localIndexTable = new PTableImpl(null, localIndexSchemaName, localIndexTableName, - ts, Collections.<PColumnFamily> emptyList()); - tableRefs.add(new TableRef(null, localIndexTable, ts, false)); - } } tableRefs.add(new TableRef(null, table, ts, false)); // TODO: Let the standard mutable secondary index maintenance handle this? @@ -2639,6 +2634,17 @@ public class MetaDataClient { buf.append("'" + ref.getTable().getPhysicalName().getString() + "',"); } buf.setCharAt(buf.length() - 1, ')'); + if(tableRefs.get(0).getTable().getIndexType()==IndexType.LOCAL) { + buf.append(" AND COLUMN_FAMILY IN("); + if (tableRefs.get(0).getTable().getColumnFamilies().isEmpty()) { + buf.append("'" + QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY + "',"); + } else { + for(PColumnFamily cf : tableRefs.get(0).getTable().getColumnFamilies()) { + buf.append("'" + cf.getName().getString() + "',"); + } + } + buf.setCharAt(buf.length() - 1, ')'); + } conn.createStatement().execute(buf.toString()); success = true; } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java index e231342..2ce5160 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java @@ -21,6 +21,7 @@ import java.util.Objects; import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -101,7 +102,7 @@ public class TableRef { String defaultFamilyName = table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : table.getDefaultFamilyName().getString(); // Translate to the data table column name String dataFamilyName = isIndex ? IndexUtil.getDataColumnFamilyName(name) : column.getFamilyName().getString() ; - cf = defaultFamilyName.equals(dataFamilyName) ? null : dataFamilyName; + cf = (table.getIndexType()==IndexType.LOCAL? IndexUtil.getActualColumnFamilyName(defaultFamilyName):defaultFamilyName).equals(dataFamilyName) ? null : dataFamilyName; cq = isIndex ? IndexUtil.getDataColumnName(name) : name; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 1f87774..5532d71 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; import static org.apache.phoenix.util.PhoenixRuntime.getTable; import java.io.ByteArrayInputStream; @@ -24,6 +25,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -40,10 +42,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; @@ -141,24 +141,37 @@ public class IndexUtil { return name.substring(0,name.indexOf(INDEX_COLUMN_NAME_SEP)); } + public static String getActualColumnFamilyName(String name) { + if(name.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + return name.substring(LOCAL_INDEX_COLUMN_FAMILY_PREFIX.length()); + } + return name; + } + public static String getCaseSensitiveDataColumnFullName(String name) { int index = name.indexOf(INDEX_COLUMN_NAME_SEP) ; - return SchemaUtil.getCaseSensitiveColumnDisplayName(name.substring(0, index), name.substring(index+1)); + return SchemaUtil.getCaseSensitiveColumnDisplayName(getDataColumnFamilyName(name), name.substring(index+1)); } public static String getIndexColumnName(String dataColumnFamilyName, String dataColumnName) { - return (dataColumnFamilyName == null ? "" : dataColumnFamilyName) + INDEX_COLUMN_NAME_SEP + dataColumnName; + return (dataColumnFamilyName == null ? "" : dataColumnFamilyName) + INDEX_COLUMN_NAME_SEP + + dataColumnName; } public static byte[] getIndexColumnName(byte[] dataColumnFamilyName, byte[] dataColumnName) { return ByteUtil.concat(dataColumnFamilyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : dataColumnFamilyName, INDEX_COLUMN_NAME_SEP_BYTES, dataColumnName); } - + public static String getIndexColumnName(PColumn dataColumn) { String dataColumnFamilyName = SchemaUtil.isPKColumn(dataColumn) ? null : dataColumn.getFamilyName().getString(); return getIndexColumnName(dataColumnFamilyName, dataColumn.getName().getString()); } + public static String getLocalIndexColumnFamily(String dataColumnFamilyName) { + return dataColumnFamilyName == null ? null + : QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + dataColumnFamilyName; + } + public static PColumn getDataColumn(PTable dataTable, String indexColumnName) { int pos = indexColumnName.indexOf(INDEX_COLUMN_NAME_SEP); if (pos < 0) { @@ -173,7 +186,7 @@ public class IndexUtil { } PColumnFamily family; try { - family = dataTable.getColumnFamily(indexColumnName.substring(0, pos)); + family = dataTable.getColumnFamily(getDataColumnFamilyName(indexColumnName)); } catch (ColumnFamilyNotFoundException e) { throw new IllegalArgumentException("Could not find column family \"" + indexColumnName.substring(0, pos) + "\" in index column name of \"" + indexColumnName + "\"", e); } @@ -222,7 +235,14 @@ public class IndexUtil { for (final Mutation dataMutation : dataMutations) { long ts = MetaDataUtil.getClientTimeStamp(dataMutation); ptr.set(dataMutation.getRow()); - Delete delete = maintainer.buildDeleteMutation(kvBuilder, ptr, ts); + byte[] regionStartKey = null; + byte[] regionEndkey = null; + if(maintainer.isLocalIndex()) { + HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), dataMutation.getRow()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); + } + Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<KeyValue>emptyList(), ts, regionStartKey, regionEndkey); delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY)); indexMutations.add(delete); } @@ -336,55 +356,6 @@ public class IndexUtil { }); } - public static Region getIndexRegion(RegionCoprocessorEnvironment environment) - throws IOException { - Region dataRegion = environment.getRegion(); - return getIndexRegion(dataRegion, environment.getRegionServerServices()); - } - - public static Region - getIndexRegion(Region dataRegion, RegionServerCoprocessorEnvironment env) - throws IOException { - return getIndexRegion(dataRegion, env.getRegionServerServices()); - } - - public static Region getDataRegion(RegionCoprocessorEnvironment env) throws IOException { - Region indexRegion = env.getRegion(); - return getDataRegion(indexRegion, env.getRegionServerServices()); - } - - public static Region - getDataRegion(Region indexRegion, RegionServerCoprocessorEnvironment env) - throws IOException { - return getDataRegion(indexRegion, env.getRegionServerServices()); - } - - public static Region getIndexRegion(Region dataRegion, RegionServerServices rss) throws IOException { - TableName indexTableName = - TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(dataRegion.getTableDesc() - .getName())); - List<Region> onlineRegions = rss.getOnlineRegions(indexTableName); - for(Region indexRegion : onlineRegions) { - if (Bytes.compareTo(dataRegion.getRegionInfo().getStartKey(), - indexRegion.getRegionInfo().getStartKey()) == 0) { - return indexRegion; - } - } - return null; - } - - public static Region getDataRegion(Region indexRegion, RegionServerServices rss) throws IOException { - TableName dataTableName = TableName.valueOf(MetaDataUtil.getUserTableName(indexRegion.getTableDesc().getNameAsString())); - List<Region> onlineRegions = rss.getOnlineRegions(dataTableName); - for(Region region : onlineRegions) { - if (Bytes.compareTo(indexRegion.getRegionInfo().getStartKey(), - region.getRegionInfo().getStartKey()) == 0) { - return region; - } - } - return null; - } - public static ColumnReference[] deserializeDataTableColumnsToJoin(Scan scan) { byte[] columnsBytes = scan.getAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN); if (columnsBytes == null) return null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index 116b62b..b65677c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -430,15 +431,35 @@ public class MetaDataUtil { } public static boolean hasLocalIndexTable(PhoenixConnection connection, byte[] physicalTableName) throws SQLException { - byte[] physicalIndexName = MetaDataUtil.getLocalIndexPhysicalName(physicalTableName); try { - HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(physicalIndexName); - return desc != null && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(desc.getValue(IS_LOCAL_INDEX_TABLE_PROP_BYTES))); + HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(physicalTableName); + if(desc == null ) return false; + return hasLocalIndexColumnFamily(desc); } catch (TableNotFoundException e) { return false; } } + public static boolean hasLocalIndexColumnFamily(HTableDescriptor desc) { + for (HColumnDescriptor cf : desc.getColumnFamilies()) { + if (cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + return true; + } + } + return false; + } + + public static List<byte[]> getNonLocalIndexColumnFamilies(HTableDescriptor desc) { + List<byte[]> families = new ArrayList<byte[]>(desc.getColumnFamilies().length); + for (HColumnDescriptor cf : desc.getColumnFamilies()) { + if (!cf.getNameAsString().startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + families.add(cf.getName()); + } + } + return families; + } + + public static void deleteViewIndexSequences(PhoenixConnection connection, PName name) throws SQLException { String schemaName = getViewIndexSchemaName(name); connection.createStatement().executeUpdate("DELETE FROM " + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index a8981a4..c0fc765 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -517,6 +517,7 @@ public class PhoenixRuntime { private boolean isBypassUpgrade; private boolean mapNamespace; private String srcTable; + private boolean localIndexUpgrade; /** * Factory method to build up an {@code ExecutionCommand} based on supplied parameters. @@ -558,6 +559,9 @@ public class PhoenixRuntime { Option mapNamespaceOption = new Option("m", "map-namespace", true, "Used to map table to a namespace matching with schema, require "+ QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " to be enabled"); + Option localIndexUpgradeOption = new Option("l", "local-index-upgrade", false, + "Used to upgrade local index data by moving index data from separate table to " + + "separate column families in the same table."); Options options = new Options(); options.addOption(tableOption); options.addOption(headerOption); @@ -569,6 +573,7 @@ public class PhoenixRuntime { options.addOption(upgradeOption); options.addOption(bypassUpgradeOption); options.addOption(mapNamespaceOption); + options.addOption(localIndexUpgradeOption); CommandLineParser parser = new PosixParser(); CommandLine cmdLine = null; @@ -622,7 +627,7 @@ public class PhoenixRuntime { } execCmd.isBypassUpgrade = true; } - + execCmd.localIndexUpgrade = cmdLine.hasOption(localIndexUpgradeOption.getOpt()); List<String> argList = Lists.newArrayList(cmdLine.getArgList()); if (argList.isEmpty()) { @@ -737,6 +742,10 @@ public class PhoenixRuntime { public String getSrcTable() { return srcTable; } + + public boolean isLocalIndexUpgrade() { + return localIndexUpgrade; + } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 8d00b2b..474cf34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; @@ -292,6 +294,119 @@ public class UpgradeUtil { } } } + + public static void upgradeLocalIndexes(PhoenixConnection metaConnection, boolean createAsyncIndex) throws SQLException, + IOException, org.apache.hadoop.hbase.TableNotFoundException { + HBaseAdmin admin = null; + try { + admin = metaConnection.getQueryServices().getAdmin(); + ResultSet rs = metaConnection.createStatement().executeQuery("SELECT TABLE_SCHEM, TABLE_NAME, DATA_TABLE_NAME FROM SYSTEM.CATALOG " + + " WHERE COLUMN_NAME IS NULL" + + " AND COLUMN_FAMILY IS NULL" + + " AND INDEX_TYPE=2"); + boolean droppedLocalIndexes = false; + while (rs.next()) { + if(!droppedLocalIndexes) { + HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); + String localIndexSplitter = LocalIndexSplitter.class.getName(); + for (HTableDescriptor table : localIndexTables) { + HTableDescriptor dataTableDesc = admin.getTableDescriptor(TableName.valueOf(MetaDataUtil.getUserTableName(table.getNameAsString()))); + HColumnDescriptor[] columnFamilies = dataTableDesc.getColumnFamilies(); + boolean modifyTable = false; + for(HColumnDescriptor cf : columnFamilies) { + String localIndexCf = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX+cf.getNameAsString(); + if(dataTableDesc.getFamily(Bytes.toBytes(localIndexCf))==null){ + HColumnDescriptor colDef = + new HColumnDescriptor(localIndexCf); + for(Entry<ImmutableBytesWritable, ImmutableBytesWritable>keyValue: cf.getValues().entrySet()){ + colDef.setValue(keyValue.getKey().copyBytes(), keyValue.getValue().copyBytes()); + } + dataTableDesc.addFamily(colDef); + modifyTable = true; + } + } + List<String> coprocessors = dataTableDesc.getCoprocessors(); + for(String coprocessor: coprocessors) { + if(coprocessor.equals(localIndexSplitter)) { + dataTableDesc.removeCoprocessor(localIndexSplitter); + modifyTable = true; + } + } + if(modifyTable) { + admin.modifyTable(dataTableDesc.getName(), dataTableDesc); + } + } + admin.disableTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); + admin.deleteTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); + droppedLocalIndexes = true; + } + String getColumns = + "SELECT COLUMN_NAME, COLUMN_FAMILY FROM SYSTEM.CATALOG WHERE TABLE_SCHEM " + + (rs.getString(1) == null ? "IS NULL " : "='" + rs.getString(1) + + "'") + " and TABLE_NAME='" + rs.getString(2) + + "' AND COLUMN_NAME IS NOT NULL"; + ResultSet getColumnsRs = metaConnection.createStatement().executeQuery(getColumns); + List<String> indexedColumns = new ArrayList<String>(1); + List<String> coveredColumns = new ArrayList<String>(1); + + while (getColumnsRs.next()) { + String column = getColumnsRs.getString(1); + String columnName = IndexUtil.getDataColumnName(column); + if (columnName.equals(MetaDataUtil.getViewIndexIdColumnName())) { + continue; + } + String columnFamily = IndexUtil.getDataColumnFamilyName(column); + if (getColumnsRs.getString(2) == null) { + if (columnFamily != null && !columnFamily.isEmpty()) { + if (columnFamily.equals(QueryConstants.DEFAULT_COLUMN_FAMILY)) { + indexedColumns.add(columnName); + } else { + indexedColumns.add(SchemaUtil.getColumnName(columnFamily, + columnName)); + } + } + } else { + coveredColumns.add(SchemaUtil.getColumnName(columnFamily, columnName)); + } + } + StringBuilder createIndex = new StringBuilder("CREATE LOCAL INDEX "); + createIndex.append(rs.getString(2)); + createIndex.append(" ON "); + createIndex.append(SchemaUtil.getTableName(rs.getString(1), rs.getString(3))); + createIndex.append("("); + for (int i = 0; i < indexedColumns.size(); i++) { + createIndex.append(indexedColumns.get(i)); + if (i < indexedColumns.size() - 1) { + createIndex.append(","); + } + } + createIndex.append(")"); + + if (!coveredColumns.isEmpty()) { + createIndex.append(" INCLUDE("); + for (int i = 0; i < coveredColumns.size(); i++) { + createIndex.append(coveredColumns.get(i)); + if (i < coveredColumns.size() - 1) { + createIndex.append(","); + } + } + createIndex.append(") ASYNC"); + } + logger.info("Index creation query is : " + createIndex.toString()); + logger.info("Dropping the index " + rs.getString(2) + + " to clean up the index details from SYSTEM.CATALOG."); + metaConnection.createStatement().execute( + "DROP INDEX IF EXISTS " + rs.getString(2) + " ON " + + SchemaUtil.getTableName(rs.getString(1), rs.getString(3))); + logger.info("Recreating the index " + rs.getString(2)); + metaConnection.createStatement().execute(createIndex.toString()); + logger.info("Created the index " + rs.getString(2)); + } + metaConnection.createStatement().execute("DELETE FROM SYSTEM.CATALOG WHERE SUBSTR(TABLE_NAME,0,11)='"+MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+"'"); + } finally { + if (admin != null) admin.close(); + } + } @SuppressWarnings("deprecation") public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java index 88bf7fc..1f1e37e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java @@ -108,7 +108,7 @@ public class TestParalleIndexWriter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); writer.setup(factory, exec, abort, stop, 1); - writer.write(indexUpdates); + writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]); writer.stop(this.test.getTableNameString() + " finished"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java index ee5e1d5..8eece3b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java @@ -108,7 +108,7 @@ public class TestParalleWriterIndexCommitter { // setup the writer and failure policy ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion()); writer.setup(factory, exec, abort, stop, 1); - writer.write(indexUpdates); + writer.write(indexUpdates, true); assertTrue("Writer returned before the table batch completed! Likely a race condition tripped", completed[0]); writer.stop(this.test.getTableNameString() + " finished"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/10909ae5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 294c82f..fdb4002 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -128,22 +128,16 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; -import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.regionserver.LocalIndexMerger; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseClientManagedTimeIT; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer; -import org.apache.phoenix.hbase.index.master.IndexMasterObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; @@ -748,11 +742,6 @@ public abstract class BaseTest { conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); conf.setInt("hbase.regionserver.metahandler.count", 2); conf.setInt(HConstants.MASTER_HANDLER_COUNT, 2); - conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, IndexMasterObserver.class.getName()); - conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class, - LoadBalancer.class); - conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class, - RegionServerObserver.class) ; conf.setInt("dfs.namenode.handler.count", 2); conf.setInt("dfs.namenode.service.handler.count", 2); conf.setInt("dfs.datanode.handler.count", 2);
