This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb4313 PHOENIX-4940 Add tenantId parameter to index tool
1bb4313 is described below
commit 1bb43138e47d8c5a72e12c21941b63908fa5069a
Author: Gokcen Iskender <[email protected]>
AuthorDate: Thu Jan 31 11:04:03 2019 -0800
PHOENIX-4940 Add tenantId parameter to index tool
Signed-off-by: Geoffrey Jacoby <[email protected]>
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 112 +++++++++++++++++++--
.../apache/phoenix/mapreduce/index/IndexTool.java | 44 +++++---
2 files changed, 138 insertions(+), 18 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index dfe4634..c1a455a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -72,13 +74,15 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
private final boolean directApi;
private final String tableDDLOptions;
private final boolean useSnapshot;
+ private final boolean useTenantId;
public IndexToolIT(String transactionProvider, boolean mutable, boolean
localIndex,
- boolean directApi, boolean useSnapshot) {
+ boolean directApi, boolean useSnapshot, boolean useTenantId) {
this.localIndex = localIndex;
this.transactional = transactionProvider != null;
this.directApi = directApi;
this.useSnapshot = useSnapshot;
+ this.useTenantId = useTenantId;
StringBuilder optionBuilder = new StringBuilder();
if (!mutable) {
optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -124,13 +128,15 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
.isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
for (boolean directApi : Booleans) {
for (boolean useSnapshot : Booleans) {
- list.add(new Object[] { transactionProvider,
mutable, localIndex, directApi, useSnapshot });
+ list.add(new Object[] { transactionProvider,
mutable, localIndex, directApi, useSnapshot, false});
}
}
}
}
}
}
+ // Add the usetenantId
+ list.add(new Object[] { "", false, false, true, false, true});
return TestUtil.filterTxParamData(list,0);
}
@@ -229,6 +235,89 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testIndexToolWithTenantId() throws Exception {
+ if (!useTenantId) { return;}
+ String tenantId = generateUniqueName();
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String viewTenantName = generateUniqueName();
+ String indexNameGlobal = generateUniqueName();
+ String indexNameTenant = generateUniqueName();
+ String viewIndexTableName = "_IDX_" + dataTableName;
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection connGlobal = DriverManager.getConnection(getUrl(), props);
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ Connection connTenant = DriverManager.getConnection(getUrl(), props);
+ String createTableStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT
NULL, ID INTEGER NOT NULL, NAME VARCHAR, "
+ + "CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID))
MULTI_TENANT=true";
+ String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+
+ String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME)
VALUES('%s' , %d, '%s')";
+ String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+ try {
+ String tableStmtGlobal = String.format(createTableStr,
dataTableName);
+ connGlobal.createStatement().execute(tableStmtGlobal);
+
+ String viewStmtTenant = String.format(createViewStr,
viewTenantName, dataTableName);
+ connTenant.createStatement().execute(viewStmtTenant);
+
+ String idxStmtTenant = String.format(createIndexStr,
indexNameTenant, viewTenantName);
+ connTenant.createStatement().execute(idxStmtTenant);
+
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, viewTenantName,
tenantId, 1, "x"));
+ connTenant.commit();
+
+ runIndexTool(true, false, "", viewTenantName, indexNameTenant,
tenantId, 0,
+ new String[0]);
+
+ String selectSql = String.format("SELECT ID FROM %s WHERE
NAME='x'", viewTenantName);
+ ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN
" + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(false, actualExplainPlan, "",
viewIndexTableName);
+ rs = connTenant.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ // Remove from tenant view index and build.
+ ConnectionQueryServices queryServices =
connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(viewIndexTableName);
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+
+ runIndexTool(true, false, "", viewTenantName, indexNameTenant,
tenantId, 0,
+ new String[0]);
+ Table htable=
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+ int count = getUtility().countRows(htable);
+ // Confirm index has rows
+ assertTrue(count == 1);
+
+ selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM
%s", indexNameTenant, viewTenantName);
+ rs = connTenant.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ String idxStmtGlobal =
+ String.format(createIndexStr, indexNameGlobal,
dataTableName);
+ connGlobal.createStatement().execute(idxStmtGlobal);
+
+ // run the index MR job this time with tenant id.
+ // We expect it to return -1 because indexTable is not correct for
this tenant.
+ runIndexTool(true, false, schemaName, dataTableName,
indexNameGlobal,
+ tenantId, -1, new String[0]);
+
+ } finally {
+ connGlobal.close();
+ connTenant.close();
+ }
+ }
+
+ @Test
public void testSaltedVariableLengthPK() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -332,7 +421,7 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute(indexDDL);
// run with 50% sampling rate, split if data table more than 3
regions
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName,
indexTableName, "-sp", "50", "-spa", "3");
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName,
indexTableName, null,"-sp", "50", "-spa", "3");
assertEquals(targetNumRegions,
admin.getTableRegions(indexTN).size());
List<Cell> values = new ArrayList<>();
@@ -361,7 +450,7 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
}
public static String[] getArgValues(boolean directApi, boolean
useSnapshot, String schemaName,
- String dataTable, String indxTable) {
+ String dataTable, String indxTable, String tenantId) {
final List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -381,6 +470,11 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
args.add("-snap");
}
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args.toArray(new String[0]);
@@ -401,15 +495,21 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
public static void runIndexTool(boolean directApi, boolean useSnapshot,
String schemaName,
String dataTableName, String indexTableName, String...
additionalArgs) throws Exception {
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName,
indexTableName, null, 0, additionalArgs);
+ }
+
+ public static void runIndexTool(boolean directApi, boolean useSnapshot,
String schemaName,
+ String dataTableName, String indexTableName, String tenantId, int
expectedStatus,
+ String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
Configuration conf = new
Configuration(getUtility().getConfiguration());
conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
indexingTool.setConf(conf);
final String[] cmdArgs =
- getArgValues(directApi, useSnapshot, schemaName,
dataTableName, indexTableName);
+ getArgValues(directApi, useSnapshot, schemaName,
dataTableName, indexTableName, tenantId);
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new
String[cmdArgList.size()]));
- assertEquals(0, status);
+ assertEquals(expectedStatus, status);
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a69fa47..1e62838 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -149,6 +149,8 @@ public class IndexTool extends Configured implements Tool {
"Output path where the files are written");
private static final Option SNAPSHOT_OPTION = new Option("snap",
"snapshot", false,
"If specified, uses Snapshots for async index building (optional)");
+ private static final Option TENANT_ID_OPTION = new Option("tenant",
"tenant-id", true,
+ "If specified, uses Tenant connection for tenant view index building
(optional)");
private static final Option HELP_OPTION = new Option("h", "help", false,
"Help");
public static final String INDEX_JOB_NAME_TEMPLATE =
"PHOENIX_%s.%s_INDX_%s";
@@ -162,6 +164,7 @@ public class IndexTool extends Configured implements Tool {
options.addOption(RUN_FOREGROUND_OPTION);
options.addOption(OUTPUT_PATH_OPTION);
options.addOption(SNAPSHOT_OPTION);
+ options.addOption(TENANT_ID_OPTION);
options.addOption(HELP_OPTION);
AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
options.addOption(AUTO_SPLIT_INDEX_OPTION);
@@ -247,15 +250,15 @@ public class IndexTool extends Configured implements Tool
{
}
public Job getJob(String schemaName, String indexTable, String
dataTable, boolean useDirectApi, boolean isPartialBuild,
- boolean useSnapshot) throws Exception {
+ boolean useSnapshot, String tenantId) throws Exception {
if (isPartialBuild) {
- return configureJobForPartialBuild(schemaName, dataTable);
+ return configureJobForPartialBuild(schemaName, dataTable,
tenantId);
} else {
- return configureJobForAsyncIndex(schemaName, indexTable,
dataTable, useDirectApi, useSnapshot);
+ return configureJobForAsyncIndex(schemaName, indexTable,
dataTable, useDirectApi, useSnapshot, tenantId);
}
}
- private Job configureJobForPartialBuild(String schemaName, String
dataTable) throws Exception {
+ private Job configureJobForPartialBuild(String schemaName, String
dataTable, String tenantId) throws Exception {
final String qDataTable =
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
final PTable pdataTable = PhoenixRuntime.getTable(connection,
qDataTable);
connection = ConnectionUtil.getInputConnection(configuration);
@@ -303,7 +306,10 @@ public class IndexTool extends Configured implements Tool {
ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr,
disabledPIndexes, connection.unwrap(PhoenixConnection.class));
PhoenixConfigurationUtil.setIndexMaintainers(configuration,
indexMetaDataPtr);
-
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
+
//Prepare raw scan
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
@@ -364,7 +370,7 @@ public class IndexTool extends Configured implements Tool {
}
- private Job configureJobForAsyncIndex(String schemaName, String
indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot)
+ private Job configureJobForAsyncIndex(String schemaName, String
indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String
tenantId)
throws Exception {
final String qDataTable =
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
final String qIndexTable;
@@ -408,6 +414,9 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setDisableIndexes(configuration,
indexTable);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
indexColumns.toArray(new String[indexColumns.size()]));
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
final List<ColumnInfo> columnMetadataList =
PhoenixRuntime.generateColumnInfo(connection, qIndexTable,
indexColumns);
ColumnInfoToStringEncoderDecoder.encode(configuration,
columnMetadataList);
@@ -536,14 +545,20 @@ public class IndexTool extends Configured implements Tool
{
String
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground =
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
- connection = ConnectionUtil.getInputConnection(configuration);
+ boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
byte[][] splitKeysBeforeJob = null;
boolean isLocalIndexBuild = false;
PTable pindexTable = null;
+ String tenantId = null;
+ if (useTenantId) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ connection = ConnectionUtil.getInputConnection(configuration);
if (indexTable != null) {
- if (!isValidIndexTable(connection, qDataTable,indexTable)) {
+ if (!isValidIndexTable(connection, qDataTable,indexTable,
tenantId)) {
throw new IllegalArgumentException(String.format(
- " %s is not an index table for %s ", indexTable,
qDataTable));
+ " %s is not an index table for %s for this
connection", indexTable, qDataTable));
}
pindexTable = PhoenixRuntime.getTable(connection, schemaName
!= null && !schemaName.isEmpty()
? SchemaUtil.getQualifiedTableName(schemaName,
indexTable) : indexTable);
@@ -581,7 +596,7 @@ public class IndexTool extends Configured implements Tool {
}
Job job = new JobFactory(connection, configuration,
outputPath).getJob(schemaName, indexTable, dataTable,
- useDirectApi, isPartialBuild, useSnapshot);
+ useDirectApi, isPartialBuild, useSnapshot, tenantId);
if (!isForeground && useDirectApi) {
LOG.info("Running Index Build in Background - Submit async and
exit");
job.submit();
@@ -742,18 +757,23 @@ public class IndexTool extends Configured implements Tool
{
* @param connection
* @param masterTable
* @param indexTable
+ * @param tenantId
* @return
* @throws SQLException
*/
private boolean isValidIndexTable(final Connection connection, final
String masterTable,
- final String indexTable) throws SQLException {
+ final String indexTable, final String tenantId) throws
SQLException {
final DatabaseMetaData dbMetaData = connection.getMetaData();
final String schemaName =
SchemaUtil.getSchemaNameFromFullName(masterTable);
final String tableName =
SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
ResultSet rs = null;
try {
- rs = dbMetaData.getIndexInfo("", schemaName, tableName, false,
false);
+ String catalog = "";
+ if (tenantId != null) {
+ catalog = tenantId;
+ }
+ rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName,
false, false);
while (rs.next()) {
final String indexName = rs.getString(6);
if (indexTable.equalsIgnoreCase(indexName)) {