This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new 152f86d Add tenantId param to IndexTool 152f86d is described below commit 152f86d00ea7d53b969deba4116d2ead4a660037 Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Mon Feb 11 12:58:53 2019 -0800 Add tenantId param to IndexTool Signed-off-by: Geoffrey Jacoby <gjac...@apache.org> --- .../org/apache/phoenix/end2end/IndexToolIT.java | 114 ++++++++++++++++++++- .../apache/phoenix/mapreduce/index/IndexTool.java | 43 +++++--- 2 files changed, 140 insertions(+), 17 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index e096bb5..c185f39 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -40,17 +40,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -73,13 +77,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { private final boolean directApi; private final String tableDDLOptions; private final boolean useSnapshot; + private final boolean useTenantId; public IndexToolIT(String transactionProvider, boolean mutable, boolean localIndex, - boolean directApi, boolean useSnapshot) { + boolean directApi, boolean useSnapshot, boolean useTenantId) { this.localIndex = localIndex; this.transactional = transactionProvider != null; this.directApi = directApi; this.useSnapshot = useSnapshot; + this.useTenantId = useTenantId; StringBuilder optionBuilder = new StringBuilder(); if (!mutable) { optionBuilder.append(" IMMUTABLE_ROWS=true "); @@ -125,13 +131,16 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { .isUnsupported(Feature.ALLOW_LOCAL_INDEX)) { for (boolean directApi : Booleans) { for (boolean useSnapshot : Booleans) { - list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot }); + list.add(new Object[] { transactionProvider, mutable, localIndex, + directApi, useSnapshot, false}); } } } } } } + // Add the usetenantId + list.add(new Object[] { "", false, false, true, false, true}); return TestUtil.filterTxParamData(list,0); } @@ -230,6 +239,90 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testIndexToolWithTenantId() throws Exception { + if (!useTenantId) { return;} + String tenantId = generateUniqueName(); + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String viewTenantName = generateUniqueName(); + String indexNameGlobal = generateUniqueName(); + String indexNameTenant = generateUniqueName(); + String viewIndexTableName = "_IDX_" + dataTableName; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection connGlobal = DriverManager.getConnection(getUrl(), props); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + Connection connTenant = DriverManager.getConnection(getUrl(), props); + String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true"; + String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s"; + + String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')"; + String createIndexStr = "CREATE INDEX %s ON %s (NAME) "; + + try { + String tableStmtGlobal = String.format(createTblStr, dataTableName); + connGlobal.createStatement().execute(tableStmtGlobal); + + String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName); + connTenant.createStatement().execute(viewStmtTenant); + + String idxStmtTenant = String.format(createIndexStr, indexNameTenant, viewTenantName); + connTenant.createStatement().execute(idxStmtTenant); + + connTenant.createStatement() + .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x")); + connTenant.commit(); + + runIndexTool(true, false, "", viewTenantName, indexNameTenant, + tenantId, 0, new String[0]); + + String selectSql = String.format("SELECT ID FROM %s WHERE NAME='x'", viewTenantName); + ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(false, actualExplainPlan, "", viewIndexTableName); + rs = connTenant.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertFalse(rs.next()); + + // Remove from tenant view index and build. + ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName tableName = TableName.valueOf(viewIndexTableName); + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + + runIndexTool(true, false, "", viewTenantName, indexNameTenant, + tenantId, 0, new String[0]); + Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName)); + int count = getUtility().countRows(htable); + // Confirm index has rows + assertTrue(count == 1); + + selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM %s", + indexNameTenant, viewTenantName); + rs = connTenant.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertFalse(rs.next()); + + String idxStmtGlobal = + String.format(createIndexStr, indexNameGlobal, dataTableName); + connGlobal.createStatement().execute(idxStmtGlobal); + + // run the index MR job this time with tenant id. + // We expect it to return -1 because indexTable is not correct for this tenant. + runIndexTool(true, false, schemaName, dataTableName, indexNameGlobal, + tenantId, -1, new String[0]); + + } finally { + connGlobal.close(); + connTenant.close(); + } + } + + @Test public void testSaltedVariableLengthPK() throws Exception { String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); @@ -362,7 +455,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { } public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName, - String dataTable, String indxTable) { + String dataTable, String indxTable, String tenantId) { final List<String> args = Lists.newArrayList(); if (schemaName != null) { args.add("-s"); @@ -382,6 +475,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { args.add("-snap"); } + if (tenantId != null) { + args.add("-tenant"); + args.add(tenantId); + } + args.add("-op"); args.add("/tmp/" + UUID.randomUUID().toString()); return args.toArray(new String[0]); @@ -402,15 +500,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, String dataTableName, String indexTableName, String... additionalArgs) throws Exception { + runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs); + } + + public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, + String dataTableName, String indexTableName, String tenantId, int expectedStatus, + String... additionalArgs) throws Exception { IndexTool indexingTool = new IndexTool(); Configuration conf = new Configuration(getUtility().getConfiguration()); conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); indexingTool.setConf(conf); final String[] cmdArgs = - getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName); + getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId); List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs)); cmdArgList.addAll(Arrays.asList(additionalArgs)); int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()])); - assertEquals(0, status); + assertEquals(expectedStatus, status); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index a4c82be..dc361c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -147,6 +147,8 @@ public class IndexTool extends Configured implements Tool { "Output path where the files are written"); private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false, "If specified, uses Snapshots for async index building (optional)"); + private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true, + "If specified, uses Tenant connection for tenant view index building (optional)"); private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s"; @@ -160,6 +162,7 @@ public class IndexTool extends Configured implements Tool { options.addOption(RUN_FOREGROUND_OPTION); options.addOption(OUTPUT_PATH_OPTION); options.addOption(SNAPSHOT_OPTION); + options.addOption(TENANT_ID_OPTION); options.addOption(HELP_OPTION); AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true); options.addOption(AUTO_SPLIT_INDEX_OPTION); @@ -245,15 +248,15 @@ public class IndexTool extends Configured implements Tool { } public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild, - boolean useSnapshot) throws Exception { + boolean useSnapshot, String tenantId) throws Exception { if (isPartialBuild) { - return configureJobForPartialBuild(schemaName, dataTable); + return configureJobForPartialBuild(schemaName, dataTable, tenantId); } else { - return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot); + return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId); } } - private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception { + private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception { final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); connection = ConnectionUtil.getInputConnection(configuration); @@ -301,7 +304,10 @@ public class IndexTool extends Configured implements Tool { ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); - + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + //Prepare raw scan Scan scan = IndexManagementUtil.newLocalStateScan(maintainers); scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp); @@ -362,7 +368,7 @@ public class IndexTool extends Configured implements Tool { } - private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot) + private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId) throws Exception { final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); final String qIndexTable; @@ -406,6 +412,9 @@ public class IndexTool extends Configured implements Tool { PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); PhoenixConfigurationUtil.setUpsertColumnNames(configuration, indexColumns.toArray(new String[indexColumns.size()])); + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); @@ -532,14 +541,20 @@ public class IndexTool extends Configured implements Tool { String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); - connection = ConnectionUtil.getInputConnection(configuration); + boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); byte[][] splitKeysBeforeJob = null; boolean isLocalIndexBuild = false; PTable pindexTable = null; + String tenantId = null; + if (useTenantId) { + tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); + configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + connection = ConnectionUtil.getInputConnection(configuration); if (indexTable != null) { - if (!isValidIndexTable(connection, qDataTable,indexTable)) { + if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) { throw new IllegalArgumentException(String.format( - " %s is not an index table for %s ", indexTable, qDataTable)); + " %s is not an index table for %s for this connection", indexTable, qDataTable)); } pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable); @@ -574,7 +589,7 @@ public class IndexTool extends Configured implements Tool { } Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, - useDirectApi, isPartialBuild, useSnapshot); + useDirectApi, isPartialBuild, useSnapshot, tenantId); if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); job.submit(); @@ -733,14 +748,18 @@ public class IndexTool extends Configured implements Tool { * @throws SQLException */ private boolean isValidIndexTable(final Connection connection, final String masterTable, - final String indexTable) throws SQLException { + final String indexTable, final String tenantId) throws SQLException { final DatabaseMetaData dbMetaData = connection.getMetaData(); final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable); final String tableName = SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable)); ResultSet rs = null; try { - rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false); + String catalog = ""; + if (tenantId != null) { + catalog = tenantId; + } + rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName, false, false); while (rs.next()) { final String indexName = rs.getString(6); if (indexTable.equalsIgnoreCase(indexName)) {