Repository: phoenix Updated Branches: refs/heads/master 6d7f07079 -> 2da5ff2ad
PHOENIX-2999 Upgrading Multi-tenant table to map with namespace using upgradeUtil Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2da5ff2a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2da5ff2a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2da5ff2a Branch: refs/heads/master Commit: 2da5ff2ad0d4573c14b2214378df9f25286fa8fb Parents: 6d7f070 Author: Ankit Singhal <[email protected]> Authored: Thu Jul 7 16:25:57 2016 +0530 Committer: Ankit Singhal <[email protected]> Committed: Thu Jul 7 16:25:57 2016 +0530 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/UpgradeIT.java | 125 +++++++++++++- .../java/org/apache/phoenix/end2end/ViewIT.java | 2 + .../coprocessor/MetaDataEndpointImpl.java | 7 +- .../query/ConnectionQueryServicesImpl.java | 8 +- .../apache/phoenix/schema/MetaDataClient.java | 8 +- .../org/apache/phoenix/util/MetaDataUtil.java | 25 +-- .../org/apache/phoenix/util/PhoenixRuntime.java | 9 +- .../org/apache/phoenix/util/UpgradeUtil.java | 169 ++++++++++++------- 8 files changed, 251 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index e6d0b66..6722b67 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -33,13 +33,13 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -186,10 +186,10 @@ public class UpgradeIT extends BaseHBaseManagedTimeIT { admin.close(); PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName); - Set<String> viewNames = MetaDataUtil.getViewNames(phxConn, phoenixFullTableName); - for (String viewName : viewNames) { - UpgradeUtil.upgradeTable(phxConn, viewName); - } + UpgradeUtil.mapChildViewsToNamespace(phxConn, phoenixFullTableName,props); + phxConn.close(); + props = new Properties(); + phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); admin = phxConn.getQueryServices().getAdmin(); String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true) .getNameAsString(); @@ -237,6 +237,121 @@ public class UpgradeIT extends BaseHBaseManagedTimeIT { } } + + @Test + public void testMapMultiTenantTableToNamespaceDuringUpgrade() throws SQLException, SnapshotCreationException, + IllegalArgumentException, IOException, InterruptedException { + String[] strings = new String[] { "a", "b", "c", "d" }; + String schemaName = "TEST"; + String phoenixFullTableName = schemaName + ".S_NEW1"; + String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true) + .getNameAsString(); + String indexName = "IDX"; + String[] tableNames = new String[] { phoenixFullTableName, "diff.v1", "test.v1", "v1" }; + String[] viewIndexes = new String[] { "test.v_idx", "diff.v_idx" }; + String[] tenantViewIndexes = new String[] { "test.v1_idx", "diff.v1_idx" }; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName + + "(k VARCHAR not null, v INTEGER not null, f INTEGER, g INTEGER NULL, h INTEGER NULL CONSTRAINT pk PRIMARY KEY(k,v)) MULTI_TENANT=true"); + PreparedStatement upsertStmt = conn + .prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)"); + int i = 1; + for (String str : strings) { + upsertStmt.setString(1, str); + upsertStmt.setInt(2, i++); + upsertStmt.execute(); + } + conn.commit(); + + // creating global index + conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(f)"); + // creating view in schema 'diff' + conn.createStatement().execute("CREATE VIEW diff.v (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + // creating view in schema 'test' + conn.createStatement().execute("CREATE VIEW test.v (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + conn.createStatement().execute("CREATE VIEW v (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + // Creating index on views + conn.createStatement().execute("create local index v_idx on diff.v(col)"); + conn.createStatement().execute("create local index v_idx on test.v(col)"); + } + Properties props = new Properties(); + String tenantId = "a"; + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + PreparedStatement upsertStmt = conn + .prepareStatement("UPSERT INTO " + phoenixFullTableName + "(k,v,f,g,h) VALUES(?, ?, 0, 0, 0)"); + int i = 1; + for (String str : strings) { + upsertStmt.setString(1, str); + upsertStmt.setInt(2, i++); + upsertStmt.execute(); + } + conn.commit(); + // creating view in schema 'diff' + conn.createStatement() + .execute("CREATE VIEW diff.v1 (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + // creating view in schema 'test' + conn.createStatement() + .execute("CREATE VIEW test.v1 (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + conn.createStatement().execute("CREATE VIEW v1 (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName); + // Creating index on views + conn.createStatement().execute("create index v1_idx on diff.v1(col)"); + conn.createStatement().execute("create index v1_idx on test.v1(col)"); + } + + props = new Properties(); + props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true)); + props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false)); + PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName); + UpgradeUtil.mapChildViewsToNamespace(phxConn,phoenixFullTableName,props); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + int i = 1; + String indexPhysicalTableName = Bytes + .toString(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName))); + // validate data with tenant + for (String tableName : tableNames) { + assertTableUsed(phxConn, tableName, hbaseTableName); + ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName); + assertTrue(rs.next()); + do { + assertEquals(i++, rs.getInt(1)); + } while (rs.next()); + i = 1; + } + // validate view Index data + for (String viewIndex : tenantViewIndexes) { + assertTableUsed(phxConn, viewIndex, indexPhysicalTableName); + ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex); + assertTrue(rs.next()); + do { + assertEquals(i++, rs.getInt(2)); + } while (rs.next()); + i = 1; + } + phxConn.close(); + props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); + + // validate view Index data + for (String viewIndex : viewIndexes) { + assertTableUsed(phxConn, viewIndex, hbaseTableName); + ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex); + for (String str : strings) { + assertTrue(rs.next()); + assertEquals(str, rs.getString(1)); + } + } + phxConn.close(); + } + + public void assertTableUsed(Connection conn, String phoenixTableName, String hbaseTableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + phoenixTableName); + assertTrue(rs.next()); + assertTrue(rs.getString(1).contains(hbaseTableName)); + } + @Test public void testSettingBaseColumnCountForMultipleViewsOnTable() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java index ab58840..35e1c62 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java @@ -410,6 +410,8 @@ public class ViewIT extends BaseViewIT { conn.createStatement().execute(ddl); ddl = "CREATE VIEW s2.v1 (v2 VARCHAR) AS SELECT * FROM " + fullTableName + " WHERE k > 5"; conn.createStatement().execute(ddl); + ddl = "CREATE LOCAL INDEX idx on s2.v1(v2)"; + conn.createStatement().execute(ddl); ddl = "CREATE VIEW s2.v2 (v2 VARCHAR) AS SELECT * FROM " + fullTableName + " WHERE k > 10"; conn.createStatement().execute(ddl); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 93bb8e5..8bea46b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -113,7 +113,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -1554,14 +1553,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso scan.setStopRow(stopRow); } SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes); + SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES, + CompareOp.EQUAL, PTableType.VIEW.getSerializedValue().getBytes()); + tableTypeFilter.setFilterIfMissing(false); linkFilter.setFilterIfMissing(true); byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil .getPhysicalTableName(SchemaUtil.getTableNameAsBytes(schemaName, tableName), table.isNamespaceMapped()) .getName()); SuffixFilter rowFilter = new SuffixFilter(suffix); - Filter filter = new FilterList(linkFilter, rowFilter); + FilterList filter = new FilterList(linkFilter,tableTypeFilter,rowFilter); scan.setFilter(filter); scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES); + scan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES); scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // Original region-only scanner modified due to PHOENIX-1208 http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f7ea5d9..25effc9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -41,8 +41,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -2615,7 +2615,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) { if (!admin.tableExists(mappedSystemTable)) { UpgradeUtil.mapTableToNamespace(admin, metatable, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM); + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM, + null); ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); @@ -2623,7 +2624,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); } for (String table : tableNames) { - UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM); + UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM, + null); ConnectionQueryServicesImpl.this.removeTable(null, table, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 77dccb1..d9cd666 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -273,8 +273,9 @@ public class MetaDataClient { TABLE_NAME + "," + COLUMN_FAMILY + "," + LINK_TYPE + "," + - TABLE_SEQ_NUM + // this is actually set to the parent table's sequence number - ") VALUES (?, ?, ?, ?, ?, ?)"; + TABLE_SEQ_NUM +","+ // this is actually set to the parent table's sequence number + TABLE_TYPE + + ") VALUES (?, ?, ?, ?, ?, ?, ?)"; private static final String CREATE_VIEW_LINK = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TENANT_ID + "," + @@ -1708,6 +1709,7 @@ public class MetaDataClient { linkStatement.setString(4, tableName); linkStatement.setByte(5, LinkType.INDEX_TABLE.getSerializedValue()); linkStatement.setLong(6, parent.getSequenceNumber()); + linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); linkStatement.execute(); } @@ -1960,8 +1962,10 @@ public class MetaDataClient { PTable physicalTable = connection.getTable(new PTableKey(null, physicalName.getString() .replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR))); linkStatement.setLong(6, physicalTable.getSequenceNumber()); + linkStatement.setString(7, null); } else { linkStatement.setLong(6, parent.getSequenceNumber()); + linkStatement.setString(7, PTableType.INDEX.getSerializedValue()); } linkStatement.execute(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index ae81d37..18a0b23 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -17,23 +17,13 @@ */ package org.apache.phoenix.util; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.util.SchemaUtil.getVarChars; import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -533,9 +523,7 @@ public class MetaDataUtil { public static final String IS_LOCAL_INDEX_TABLE_PROP_NAME = "IS_LOCAL_INDEX_TABLE"; public static final byte[] IS_LOCAL_INDEX_TABLE_PROP_BYTES = Bytes.toBytes(IS_LOCAL_INDEX_TABLE_PROP_NAME); - private static final String GET_VIEWS_QUERY = "SELECT " + TABLE_SCHEM + "," + TABLE_NAME + " FROM " - + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND " + LINK_TYPE - + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue(); + public static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp){ return newTableRowsScan(key, null, startTimeStamp, stopTimeStamp); @@ -580,17 +568,6 @@ public class MetaDataUtil { } } - public static Set<String> getViewNames(PhoenixConnection conn, String table) throws SQLException { - Set<String> viewNames = new HashSet<String>(); - PreparedStatement preparedStatment = conn.prepareStatement(GET_VIEWS_QUERY); - preparedStatment.setString(1, SchemaUtil.normalizeIdentifier(table)); - ResultSet rs = preparedStatment.executeQuery(); - while (rs.next()) { - viewNames.add(SchemaUtil.getTableName(rs.getString(1), rs.getString(2))); - } - return viewNames; - } - public static String getAutoPartitionColumnName(PTable parentTable) { List<PColumn> parentTableColumns = parentTable.getPKColumns(); PColumn column = parentTableColumns.get(getAutoPartitionColIndex(parentTable)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 725f90a..a690dd8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -50,14 +50,12 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang.StringEscapeUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.StringUtils; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; @@ -70,7 +68,6 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.monitoring.GlobalMetric; -import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -222,11 +219,7 @@ public class PhoenixRuntime { String srcTable = execCmd.getSrcTable(); System.out.println("Starting upgrading table:" + srcTable + "... please don't kill it in between!!"); UpgradeUtil.upgradeTable(conn, srcTable); - Set<String> viewNames = MetaDataUtil.getViewNames(conn, srcTable); - System.out.println("upgrading following views:"+viewNames); - for (String viewName : viewNames) { - UpgradeUtil.upgradeTable(conn, viewName); - } + UpgradeUtil.mapChildViewsToNamespace(conn, srcTable,props); } else if (execCmd.isUpgrade()) { if (conn.getClientInfo(PhoenixRuntime.CURRENT_SCN_ATTRIB) != null) { throw new SQLException( "May not specify the CURRENT_SCN property when upgrading"); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2da5ff2a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index bfe37b0..9046287 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -43,6 +43,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TAB import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT; @@ -61,9 +62,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeoutException; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -155,14 +156,20 @@ public class UpgradeUtil { TABLE_NAME + "," + COLUMN_FAMILY + "," + LINK_TYPE + "," + - TABLE_SEQ_NUM + + TABLE_SEQ_NUM +"," + + TABLE_TYPE + ") SELECT " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + ",'%s' AS " - + COLUMN_FAMILY + " ," + LINK_TYPE + "," + TABLE_SEQ_NUM + " FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" - + SYSTEM_CATALOG_TABLE + "\" WHERE " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = " + + COLUMN_FAMILY + " ," + LINK_TYPE + "," + TABLE_SEQ_NUM + "," + TABLE_TYPE +" FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + + SYSTEM_CATALOG_TABLE + "\" WHERE (" + TABLE_SCHEM + "=? OR (" + TABLE_SCHEM + " IS NULL AND ? IS NULL)) AND " + TABLE_NAME + "=? AND " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue(); private static final String DELETE_LINK = "DELETE FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE - + " WHERE " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue(); + + " WHERE (" + TABLE_SCHEM + "=? OR (" + TABLE_SCHEM + " IS NULL AND ? IS NULL)) AND " + TABLE_NAME + "=? AND " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue(); + + private static final String GET_VIEWS_QUERY = "SELECT " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + + " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND " + + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + " AND ( " + TABLE_TYPE + "=" + "'" + + PTableType.VIEW.getSerializedValue() + "' OR " + TABLE_TYPE + " IS NULL) ORDER BY "+TENANT_ID; private UpgradeUtil() { } @@ -1637,7 +1644,7 @@ public class UpgradeUtil { } private static void mapTableToNamespace(HBaseAdmin admin, HTableInterface metatable, String srcTableName, - String destTableName, ReadOnlyProps props, Long ts, String phoenixTableName, PTableType pTableType) + String destTableName, ReadOnlyProps props, Long ts, String phoenixTableName, PTableType pTableType,PName tenantId) throws SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException, SQLException { srcTableName = SchemaUtil.normalizeIdentifier(srcTableName); @@ -1667,25 +1674,26 @@ public class UpgradeUtil { } } - byte[] tableKey = SchemaUtil.getTableKey(null, SchemaUtil.getSchemaNameFromFullName(phoenixTableName), + byte[] tableKey = SchemaUtil.getTableKey(tenantId != null ? tenantId.getString() : null, + SchemaUtil.getSchemaNameFromFullName(phoenixTableName), SchemaUtil.getTableNameFromFullName(phoenixTableName)); List<Cell> columnCells = metatable.get(new Get(tableKey)) .getColumnCells(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); if (ts == null) { if (!columnCells.isEmpty()) { ts = columnCells.get(0).getTimestamp(); - } else { - throw new IllegalArgumentException( - "Timestamp passed is null and cannot derive timestamp for " + tableKey + " from meta table!!"); - } + } else if (PTableType.SYSTEM != pTableType) { throw new IllegalArgumentException( + "Timestamp passed is null and cannot derive timestamp for " + tableKey + " from meta table!!"); } + } + if (ts != null) { + // Update flag to represent table is mapped to namespace + logger.info(String.format("Updating meta information of phoenix table '%s' to map to namespace..", + phoenixTableName)); + Put put = new Put(tableKey, ts); + put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES, + PBoolean.INSTANCE.toBytes(Boolean.TRUE)); + metatable.put(put); } - // Update flag to represent table is mapped to namespace - logger.info(String.format("Updating meta information of phoenix table '%s' to map to namespace..", - phoenixTableName)); - Put put = new Put(tableKey, ts); - put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES, - PBoolean.INSTANCE.toBytes(Boolean.TRUE)); - metatable.put(put); } /* @@ -1693,18 +1701,16 @@ public class UpgradeUtil { * use map table utility in psql.py */ public static void mapTableToNamespace(HBaseAdmin admin, HTableInterface metatable, String tableName, - ReadOnlyProps props, Long ts, PTableType pTableType) throws SnapshotCreationException, + ReadOnlyProps props, Long ts, PTableType pTableType, PName tenantId) throws SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException, SQLException { String destTablename = SchemaUtil .normalizeIdentifier(SchemaUtil.getPhysicalTableName(tableName, props).getNameAsString()); - mapTableToNamespace(admin, metatable, tableName, destTablename, props, ts, tableName, pTableType); + mapTableToNamespace(admin, metatable, tableName, destTablename, props, ts, tableName, pTableType, tenantId); } public static void upgradeTable(PhoenixConnection conn, String srcTable) throws SQLException, SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException { ReadOnlyProps readOnlyProps = conn.getQueryServices().getProps(); - if (conn.getClientInfo(PhoenixRuntime.TENANT_ID_ATTRIB) != null) { throw new SQLException( - "May not specify the TENANT_ID_ATTRIB property when upgrading"); } if (conn.getSchema() != null) { throw new IllegalArgumentException( "Schema should not be set for connection!!"); } if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE, @@ -1719,8 +1725,7 @@ public class UpgradeUtil { String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName); // Confirm table is not already upgraded PTable table = PhoenixRuntime.getTable(conn, tableName); - if (table.isMultiTenant()) { throw new IllegalArgumentException( - "Sorry!! currently support for upgrading multi-tenant table to map to namespace is not supported!!"); } + // Upgrade is not required if schemaName is not present. if (schemaName.equals("") && !PTableType.VIEW .equals(table.getType())) { throw new IllegalArgumentException("Table doesn't have schema name"); } @@ -1730,36 +1735,37 @@ public class UpgradeUtil { logger.info(String.format("Creating schema %s..", schemaName)); conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); } - String newPhysicalTablename = SchemaUtil.normalizeIdentifier(SchemaUtil - .getPhysicalTableName(table.getPhysicalName().getString(), readOnlyProps).getNameAsString()); + String oldPhysicalName = table.getPhysicalName().getString(); + String newPhysicalTablename = SchemaUtil.normalizeIdentifier( + SchemaUtil.getPhysicalTableName(oldPhysicalName, readOnlyProps).getNameAsString()); logger.info(String.format("Upgrading %s %s..", table.getType(), tableName)); // Upgrade the data or main table mapTableToNamespace(admin, metatable, tableName, newPhysicalTablename, readOnlyProps, - PhoenixRuntime.getCurrentScn(readOnlyProps), tableName, table.getType()); - conn.close(); + PhoenixRuntime.getCurrentScn(readOnlyProps), tableName, table.getType(),conn.getTenantId()); // clear the cache and get new table - conn.getQueryServices().clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, table.getSchemaName().getBytes(), - table.getTableName().getBytes(), PhoenixRuntime.getCurrentScn(readOnlyProps)); - MetaDataMutationResult result = new MetaDataClient(conn).updateCache(schemaName, - SchemaUtil.getTableNameFromFullName(tableName)); + conn.getQueryServices().clearTableFromCache( + conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : conn.getTenantId().getBytes(), + table.getSchemaName().getBytes(), table.getTableName().getBytes(), + PhoenixRuntime.getCurrentScn(readOnlyProps)); + MetaDataMutationResult result = new MetaDataClient(conn).updateCache(conn.getTenantId(),schemaName, + SchemaUtil.getTableNameFromFullName(tableName),true); if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { throw new TableNotFoundException( tableName); } table = result.getTable(); + // check whether table is properly upgraded before upgrading indexes if (table.isNamespaceMapped()) { for (PTable index : table.getIndexes()) { String srcTableName = index.getPhysicalName().getString(); - if (srcTableName.contains(QueryConstants.NAMESPACE_SEPARATOR) - || (!MetaDataUtil.isViewIndex(srcTableName) && PTableType.VIEW.equals(table.getType()))) { - // this condition occurs in case of multiple views on same table - // as all view indexes uses the same physical table, so if one view is already migrated then we - // can skip migrating the physical table again - logger.info(String.format("skipping as it seems index '%s' is already upgraded..", index.getName())); - continue; - } String destTableName = null; String phoenixTableName = index.getName().getString(); - boolean updateLink = false; + boolean updateLink = true; + if (srcTableName.contains(QueryConstants.NAMESPACE_SEPARATOR)) { + // Skip already migrated + logger.info(String.format("skipping as it seems index '%s' is already upgraded..", + index.getName())); + continue; + } if (MetaDataUtil.isLocalIndex(srcTableName)) { logger.info(String.format("local index '%s' found with physical hbase table name ''..", index.getName(), srcTableName)); @@ -1769,13 +1775,11 @@ public class UpgradeUtil { conn.createStatement() .execute(String.format("ALTER TABLE %s set " + MetaDataUtil.PARENT_TABLE_KEY + "='%s'", phoenixTableName, table.getPhysicalName())); - updateLink = true; } else if (MetaDataUtil.isViewIndex(srcTableName)) { logger.info(String.format("View index '%s' found with physical hbase table name ''..", index.getName(), srcTableName)); destTableName = Bytes .toString(MetaDataUtil.getViewIndexPhysicalName(newPhysicalTablename.getBytes())); - updateLink = true; } else { logger.info(String.format("Global index '%s' found with physical hbase table name ''..", index.getName(), srcTableName)); @@ -1784,14 +1788,19 @@ public class UpgradeUtil { .getNameAsString(); } logger.info(String.format("Upgrading index %s..", index.getName())); - mapTableToNamespace(admin, metatable, srcTableName, destTableName, readOnlyProps, - PhoenixRuntime.getCurrentScn(readOnlyProps), phoenixTableName, index.getType()); + if (!(table.getType() == PTableType.VIEW && !MetaDataUtil.isViewIndex(srcTableName) + && IndexType.LOCAL != index.getIndexType())) { + mapTableToNamespace(admin, metatable, srcTableName, destTableName, readOnlyProps, + PhoenixRuntime.getCurrentScn(readOnlyProps), phoenixTableName, index.getType(), + conn.getTenantId()); + } if (updateLink) { logger.info(String.format("Updating link information for index '%s' ..", index.getName())); - updateLink(conn, srcTableName, destTableName); + updateLink(conn, srcTableName, destTableName,index.getSchemaName(),index.getTableName()); conn.commit(); } - conn.getQueryServices().clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, + conn.getQueryServices().clearTableFromCache( + conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : conn.getTenantId().getBytes(), index.getSchemaName().getBytes(), index.getTableName().getBytes(), PhoenixRuntime.getCurrentScn(readOnlyProps)); } @@ -1801,6 +1810,10 @@ public class UpgradeUtil { } else { throw new RuntimeException("Error: problem occured during upgrade. Table is not upgraded successfully"); } + if (table.getType() == PTableType.VIEW) { + updateLink(conn, oldPhysicalName, newPhysicalTablename,table.getSchemaName(),table.getTableName()); + conn.commit(); + } } } @@ -1814,25 +1827,65 @@ public class UpgradeUtil { String newSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true); String newSequenceName = MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true); // create new entry with new schema format - String upsert = "UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " SELECT " + TENANT_ID + ",\'" - + newSchemaName + "\',\'" + newSequenceName + "\'," + START_WITH + "," + CURRENT_VALUE + "," - + INCREMENT_BY + "," + CACHE_SIZE + "," + MIN_VALUE + "," + MAX_VALUE + "," + CYCLE_FLAG + "," - + LIMIT_REACHED_FLAG + " FROM " + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " - + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" - + oldSchemaName + "'"; + String upsert = "UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " SELECT REGEXP_SPLIT(" + + PhoenixDatabaseMetaData.SEQUENCE_NAME + ",'_')[3] ,\'" + newSchemaName + "\',\'" + newSequenceName + + "\'," + START_WITH + "," + CURRENT_VALUE + "," + INCREMENT_BY + "," + CACHE_SIZE + "," + MIN_VALUE + + "," + MAX_VALUE + "," + CYCLE_FLAG + "," + LIMIT_REACHED_FLAG + " FROM " + + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " + PhoenixDatabaseMetaData.TENANT_ID + + " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + oldSchemaName + "'"; connection.createStatement().executeUpdate(upsert); // delete old sequence MetaDataUtil.deleteViewIndexSequences(connection, oldPhysicalName, false); } - private static void updateLink(PhoenixConnection conn, String srcTableName, String destTableName) + private static void updateLink(PhoenixConnection conn, String srcTableName, String destTableName, PName schemaName, PName tableName) throws SQLException { + PreparedStatement updateLinkStatment = conn.prepareStatement(String.format(UPDATE_LINK,destTableName)); + updateLinkStatment.setString(1, schemaName.getString()); + updateLinkStatment.setString(2, schemaName.getString()); + updateLinkStatment.setString(3, tableName.getString()); + updateLinkStatment.setString(4, srcTableName); + + updateLinkStatment.execute(); PreparedStatement deleteLinkStatment = conn.prepareStatement(DELETE_LINK); - deleteLinkStatment.setString(1, srcTableName); - PreparedStatement updateLinkStatment = conn.prepareStatement(String.format(UPDATE_LINK, destTableName)); - updateLinkStatment.setString(1, srcTableName); + deleteLinkStatment.setString(1, schemaName.getString()); + deleteLinkStatment.setString(2, schemaName.getString()); + deleteLinkStatment.setString(3, tableName.getString()); + deleteLinkStatment.setString(4, srcTableName); deleteLinkStatment.execute(); - updateLinkStatment.execute(); + + } + + public static void mapChildViewsToNamespace(PhoenixConnection conn, String table, Properties props) + throws SQLException, SnapshotCreationException, IllegalArgumentException, IOException, + InterruptedException { + PreparedStatement preparedStatment = conn.prepareStatement(GET_VIEWS_QUERY); + preparedStatment.setString(1, SchemaUtil.normalizeIdentifier(table)); + ResultSet rs = preparedStatment.executeQuery(); + String tenantId = null; + String prevTenantId = null; + PhoenixConnection passedConn = conn; + while (rs.next()) { + tenantId = rs.getString(1); + if (prevTenantId != tenantId) { + if (tenantId != null) { + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } else { + props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + } + if (passedConn != conn) { + conn.close(); + } + conn = DriverManager.getConnection(conn.getURL(), props).unwrap(PhoenixConnection.class); + } + String viewName=SchemaUtil.getTableName(rs.getString(2), rs.getString(3)); + logger.info(String.format("Upgrading view %s for tenantId %s..", viewName,tenantId)); + UpgradeUtil.upgradeTable(conn, viewName); + prevTenantId = tenantId; + } + if (passedConn != conn) { + conn.close(); + } } } \ No newline at end of file
