This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 0b5eddc PHOENIX-5415: NPE in getting conf from addHbaseResources in
IndexUpgradeTool
0b5eddc is described below
commit 0b5eddc38a566b2c83cc30aff5a1b9f679efdad0
Author: s.kadam <[email protected]>
AuthorDate: Sun Jul 28 20:47:54 2019 -0700
PHOENIX-5415: NPE in getting conf from addHbaseResources in IndexUpgradeTool
---
.../phoenix/mapreduce/index/IndexUpgradeTool.java | 141 +++++++++++++--------
1 file changed, 88 insertions(+), 53 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index 861fb66..daac604 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.mapreduce.index;
import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -26,6 +27,8 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -55,6 +58,7 @@ import java.util.List;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
import java.io.IOException;
import java.nio.file.Files;
@@ -70,7 +74,7 @@ import java.util.stream.Collectors;
import static
org.apache.phoenix.query.QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN;
-public class IndexUpgradeTool extends Configured {
+public class IndexUpgradeTool extends Configured implements Tool {
private static final Logger LOGGER =
Logger.getLogger(IndexUpgradeTool.class.getName());
@@ -145,21 +149,6 @@ public class IndexUpgradeTool extends Configured {
return operation;
}
- public static void main (String[] args) {
- CommandLine cmdLine = null;
-
- IndexUpgradeTool iut = new IndexUpgradeTool();
- try {
- cmdLine = iut.parseOptions(args);
- LOGGER.info("Index Upgrade tool initiated: "+ String.join(",",
args));
- } catch (IllegalStateException e) {
- iut.printHelpAndExit(e.getMessage(), iut.getOptions());
- }
- iut.initializeTool(cmdLine);
- iut.prepareToolSetup();
- iut.executeTool();
- }
-
public IndexUpgradeTool(String mode, String tables, String inputFile,
String outputFile, boolean dryRun, IndexTool indexTool) {
this.operation = mode;
@@ -172,6 +161,21 @@ public class IndexUpgradeTool extends Configured {
public IndexUpgradeTool () { }
+ @Override
+ public int run(String[] args) throws Exception {
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parseOptions(args);
+ LOGGER.info("Index Upgrade tool initiated: " + String.join(",",
args));
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ }
+ initializeTool(cmdLine);
+ prepareToolSetup();
+ executeTool();
+ return 0;
+ }
+
/**
* Parses the commandline arguments, throws IllegalStateException if
mandatory arguments are
* missing.
@@ -329,7 +333,7 @@ public class IndexUpgradeTool extends Configured {
boolean mutable = !(dataTable.isImmutableRows());
if (!mutable) {
LOGGER.fine("Data table is immutable, waiting for "
- + GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN
+ 1
+ + (GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN
+ 1)
+ " minutes for client cache to expire");
if (!test) {
Thread.sleep(
@@ -368,7 +372,7 @@ public class IndexUpgradeTool extends Configured {
}
LOGGER.info("Disabled data table " + dataTable);
} else {
- LOGGER.info( "Data table " + dataTable +" is already disabled");
+ LOGGER.info( "Data table " + dataTable + " is already disabled");
}
for (String indexName : indexes) {
if (admin.isTableEnabled(TableName.valueOf(indexName))) {
@@ -377,7 +381,7 @@ public class IndexUpgradeTool extends Configured {
}
LOGGER.info("Disabled index table " + indexName);
} else {
- LOGGER.info( "Index table " + indexName +" is already
disabled");
+ LOGGER.info( "Index table " + indexName + " is already
disabled");
}
}
}
@@ -390,7 +394,7 @@ public class IndexUpgradeTool extends Configured {
}
LOGGER.info("Enabled data table " + dataTable);
} else {
- LOGGER.info( "Data table " + dataTable +" is already enabled");
+ LOGGER.info( "Data table " + dataTable + " is already enabled");
}
for (String indexName : indexes) {
if(!admin.isTableEnabled(TableName.valueOf(indexName))) {
@@ -399,7 +403,7 @@ public class IndexUpgradeTool extends Configured {
}
LOGGER.info("Enabled index table " + indexName);
} else {
- LOGGER.info( "Index table " + indexName +" is already
enabled");
+ LOGGER.info( "Index table " + indexName + " is already
enabled");
}
}
}
@@ -431,26 +435,28 @@ public class IndexUpgradeTool extends Configured {
}
}
- private void addCoprocessor(Admin admin, String tableName,
TableDescriptorBuilder tableDescBuilder, String coprocName) throws IOException {
+ private void addCoprocessor(Admin admin, String tableName,
TableDescriptorBuilder tableDescBuilder,
+ String coprocName) throws IOException {
if
(!admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName))
{
if (!dryRun) {
tableDescBuilder.addCoprocessor(coprocName,
null,
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
}
- LOGGER.info("Loaded "+coprocName+" coprocessor on table " +
tableName);
+ LOGGER.info("Loaded " + coprocName + " coprocessor on table " +
tableName);
} else {
- LOGGER.info(coprocName+" coprocessor on table " + tableName + "is
already loaded");
+ LOGGER.info(coprocName + " coprocessor on table " + tableName +
"is already loaded");
}
}
- private void removeCoprocessor(Admin admin, String tableName,
TableDescriptorBuilder tableDescBuilder, String coprocName) throws IOException {
+ private void removeCoprocessor(Admin admin, String tableName,
TableDescriptorBuilder tableDescBuilder,
+ String coprocName) throws IOException {
if
(admin.getDescriptor(TableName.valueOf(tableName)).hasCoprocessor(coprocName)) {
if (!dryRun) {
tableDescBuilder.removeCoprocessor(coprocName);
}
LOGGER.info("Unloaded "+ coprocName +"coprocessor on table " +
tableName);
} else {
- LOGGER.info(coprocName+" coprocessor on table " + tableName + " is
already unloaded");
+ LOGGER.info(coprocName + " coprocessor on table " + tableName + "
is already unloaded");
}
}
@@ -534,7 +540,7 @@ public class IndexUpgradeTool extends Configured {
//for upgrade or rollback
tablesAndIndexes.put(physicalTableName, physicalIndexes);
} else {
- LOGGER.info("Skipping Table " + tableName + " because it
is "+
+ LOGGER.info("Skipping Table " + tableName + " because it
is " +
(dataTable.isTransactional() ? "transactional" :
"not a data table"));
}
}
@@ -550,13 +556,13 @@ public class IndexUpgradeTool extends Configured {
private void prepareToRebuildIndexes(Connection conn, String
dataTableFullName) {
try {
+ Gson gson = new Gson();
HashMap<String, IndexInfo> rebuildIndexes = new HashMap<>();
HashSet<String> physicalIndexes =
tablesAndIndexes.get(dataTableFullName);
String viewIndexPhysicalName = MetaDataUtil
.getViewIndexPhysicalName(dataTableFullName);
boolean hasViewIndex =
physicalIndexes.contains(viewIndexPhysicalName);
-
String schemaName =
SchemaUtil.getSchemaNameFromFullName(dataTableFullName);
String tableName =
SchemaUtil.getTableNameFromFullName(dataTableFullName);
@@ -572,40 +578,64 @@ public class IndexUpgradeTool extends Configured {
}
if (hasViewIndex) {
- ResultSet
- rs =
- conn.createStatement().executeQuery(
- "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM "
- + "SYSTEM.CATALOG WHERE COLUMN_FAMILY
= \'"
- + viewIndexPhysicalName
- + "\' AND TABLE_TYPE = \'i\' AND " +
"LINK_TYPE = "
- +
PTable.LinkType.PHYSICAL_TABLE.getSerializedValue());
+ String viewSql = "SELECT DISTINCT TABLE_NAME, TENANT_ID FROM "
+ + "SYSTEM.CATALOG "
+ + "WHERE COLUMN_FAMILY = \'" + dataTableFullName + "\'
"
+ + (!StringUtil.EMPTY_STRING.equals(schemaName) ? "AND
TABLE_SCHEM = \'"
+ + schemaName + "\' " : "")
+ + "AND LINK_TYPE = "
+ + PTable.LinkType.PHYSICAL_TABLE.getSerializedValue();
+
+ ResultSet rs = conn.createStatement().executeQuery(viewSql);
+
while (rs.next()) {
- String viewIndexName = rs.getString(1);
+ String viewName = rs.getString(1);
String tenantId = rs.getString(2);
- ResultSet
- innerRS =
- conn.createStatement().executeQuery(
- "SELECT DISTINCT TABLE_NAME FROM "
- + "SYSTEM.CATALOG WHERE
COLUMN_FAMILY = \'"
- + viewIndexName
- + "\' AND TABLE_TYPE = \'i\' AND "
+ "LINK_TYPE = "
- +
PTable.LinkType.INDEX_TABLE.getSerializedValue());
- innerRS.next();
- String viewName = innerRS.getString(1);
- IndexInfo indexInfo = new IndexInfo(schemaName, viewName,
tenantId == null ?
- GLOBAL_INDEX_ID: tenantId, viewIndexName);
- rebuildIndexes.put(viewIndexName, indexInfo);
+ ArrayList<String> viewIndexes = findViewIndexes(conn,
schemaName, viewName,
+ tenantId);
+ for (String viewIndex : viewIndexes) {
+ IndexInfo indexInfo = new IndexInfo(schemaName,
viewName,
+ tenantId == null ? GLOBAL_INDEX_ID : tenantId,
viewIndex);
+ rebuildIndexes.put(viewIndex, indexInfo);
+ }
}
}
- //for rebuilding indexes in case of upgrade.
- rebuildMap.put(dataTableFullName, rebuildIndexes);
+ //for rebuilding indexes in case of upgrade and if there are
indexes on the table/view.
+ if (!rebuildIndexes.isEmpty()) {
+ rebuildMap.put(dataTableFullName, rebuildIndexes);
+ String json = gson.toJson(rebuildMap);
+ LOGGER.info("Index rebuild map " + json);
+ } else {
+ LOGGER.info("No indexes to rebuild for table " +
dataTableFullName);
+ }
+
} catch (SQLException e) {
- LOGGER.severe("Failed to prepare the map for index rebuilds "+e);
+ LOGGER.severe("Failed to prepare the map for index rebuilds " + e);
throw new RuntimeException("Failed to prepare the map for index
rebuilds");
}
}
+ private ArrayList<String> findViewIndexes(Connection conn, String
schemaName, String viewName,
+ String tenantId) throws SQLException {
+
+ String viewIndexesSql = "SELECT DISTINCT COLUMN_FAMILY FROM "
+ + "SYSTEM.CATALOG "
+ + "WHERE TABLE_NAME = \'" + viewName + "\'"
+ + (!StringUtil.EMPTY_STRING.equals(schemaName) ? "AND
TABLE_SCHEM = \'"
+ + schemaName + "\' " : "")
+ + "AND LINK_TYPE = " +
PTable.LinkType.INDEX_TABLE.getSerializedValue()
+ + (tenantId != null ? " AND TENANT_ID = \'" + tenantId + "\'"
: "");
+ ArrayList<String> viewIndexes = new ArrayList<>();
+ ResultSet
+ rs =
+ conn.createStatement().executeQuery(viewIndexesSql);
+ while(rs.next()) {
+ String viewIndexName = rs.getString(1);
+ viewIndexes.add(viewIndexName);
+ }
+ return viewIndexes;
+ }
+
private class IndexInfo {
final private String schemaName;
final private String baseTable;
@@ -635,4 +665,9 @@ public class IndexUpgradeTool extends Configured {
return indexName;
}
}
+
+ public static void main (String[] args) throws Exception {
+ int result = ToolRunner.run(new IndexUpgradeTool(), args);
+ System.exit(result);
+ }
}