gjacoby126 commented on a change in pull request #520: PHOENIX-5333: A tool to
upgrade existing tables/indexes to use self-c…
URL: https://github.com/apache/phoenix/pull/520#discussion_r295526141
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
##########
@@ -0,0 +1,456 @@
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.FileHandler;
+import java.util.logging.SimpleFormatter;
+
+
+public class IndexUpgradeTool extends Configured {
+
+ private static final Logger LOGGER =
Logger.getLogger(IndexUpgradeTool.class.getName());
+
+ private static final Option OPERATION_OPTION = new Option("o",
"operation", true,
+ "[Required]Operation to perform (upgrade/rollback)");
+ private static final Option TABLE_OPTION = new Option("tb", "table", true,
+ "[Required]Tables and indexes list ex. table1: index1, index2,
index3; table2: index4, index5");
+ private static final Option TABLE_CSV_FILE_OPTION = new Option("f",
"file", true,
+ "[Optional]Tables and indexes list in a csv file");
+ private static final Option DRY_RUN_OPTION = new Option("s", "show", false,
+ "[Optional]If passed this will output steps that will be
executed");
+ private static final Option HELP_OPTION = new Option("h", "help", false,
"Help");
+ private static final Option LOG_FILE_OPTION = new Option("lf", "logfile",
true,
+ "Log file path where the logs are written");
+ private static final Option HOST_OPTION = new Option("ho", "host", true,
"[Required]zookeeper quorum host");
+ private static final Option PORT_OPTION = new Option("po", "port", true,
"[Required]zookeeper port");
+
+ private static final String UPGRADE_OP = "upgrade";
+ private static final String ROLLBACK_OP = "rollback";
+
+
+ private static HashMap <String, Boolean> tableMC = new HashMap<>();
+ private static Map<String, String> prop = new HashMap<>();
+
+ private static String connectionURL;
+ private static boolean dryRun;
+
+ public static void main (String[] args) {
+ CommandLine cmdLine = null;
+ try {
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ }
+ String operation =
cmdLine.getOptionValue(OPERATION_OPTION.getOpt());
+ String tables = cmdLine.getOptionValue(TABLE_OPTION.getOpt());
+ String host = cmdLine.getOptionValue(HOST_OPTION.getOpt());
+ String port = cmdLine.getOptionValue(PORT_OPTION.getOpt());
+
+ String logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt());
+ if (logFile != null) {
+ FileHandler fh = new FileHandler(logFile);
+ fh.setFormatter(new SimpleFormatter());
+ LOGGER.addHandler(fh);
+ }
+ if (dryRun) {
+ LOGGER.info("This is the beginning of the tool with dry run.");
+ }
+ LOGGER.info("Index Upgrade tool initiated: "+ String.join(",",
args));
+
+ prop.put(Indexer.INDEX_BUILDER_CONF_KEY,
PhoenixIndexBuilder.class.getName());
+ prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY,
PhoenixIndexCodec.class.getName());
+
+ connectionURL = "jdbc:phoenix:"+host+":"+port;
+
+ if (tables == null) {
+ String tablesCsv =
cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt());
+ tables = new String(Files.readAllBytes(Paths.get(tablesCsv)));
+ }
+ LOGGER.info("list of tables/indexes passed: "+ tables);
+
+ if (tables == null) {
+ LOGGER.severe("tables' list is not available; use -tb or -f
option");
+ System.exit(1);
+ }
+
+ HashMap<String, ArrayList<String>> tablesAndIndexes =
getTablesAndIndexes(tables);
+
+ if (operation.equalsIgnoreCase(UPGRADE_OP)) {
+ LOGGER.info("Executing upgrade");
+ upgradeConsistencyFeature(tablesAndIndexes);
+ } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) {
+ LOGGER.info("Executing rollback");
+ rollbackConsistencyFeature(tablesAndIndexes);
+ } else {
+ throw new IllegalStateException("Invalid option provided for
"+OPERATION_OPTION.getOpt()+" expected values: {upgrade, rollback}");
+ }
+
+ } catch (IOException e) {
+ LOGGER.severe("Something went wrong "+e);
+ }
+ }
+
+ private static void upgradeConsistencyFeature(HashMap<String,
ArrayList<String>> tablesAndIndexes) {
+
+ try {
+ Connection conn = DriverManager.getConnection(connectionURL, new
Properties());
+ ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Configuration conf = queryServices.getConfiguration();
+ tablesAndIndexes.entrySet().parallelStream().forEach(entry -> {
+
+ try (Admin admin = queryServices.getAdmin()) {
+
+ String fullTableName = entry.getKey();
+ PTable dataTable = PhoenixRuntime.getTableNoCache(conn,
fullTableName);
+ LOGGER.fine("Executing upgrade for "+ fullTableName);
+
+ boolean mutable = !(dataTable.isImmutableRows());
+
+ checkAndWaitForMajorCompaction(admin, entry);
+
+ resetMajorCompaction(admin, entry);
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(fullTableName));
+ }
+ LOGGER.info("Disabled data table "+ fullTableName);
+
+ for (String indexName : entry.getValue()) {
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Disabled index table "+ indexName);
+ TableDescriptorBuilder indexTableDescBuilder =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+ if
(!admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
{
+
+ if (!dryRun) {
+
indexTableDescBuilder.addCoprocessor(GlobalIndexChecker.class.getName(), null,
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+ }
+ LOGGER.info("Loaded GlobalIndexChecker coprocessor
on index table "+indexName);
+ }
+ if (!dryRun) {
+ admin.modifyTable(indexTableDescBuilder.build());
+ }
+ }
+ if (mutable) {
+ LOGGER.fine(fullTableName+" is mutable");
+ TableDescriptorBuilder dataTableDescBuilder =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+
if(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
{
+ if (!dryRun) {
+
dataTableDescBuilder.removeCoprocessor(Indexer.class.getName());
+ }
+ LOGGER.info("Unloaded Indexer coprocessor on data
table "+fullTableName);
+ }
+
if(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
{
+ if(!dryRun) {
+
dataTableDescBuilder.addCoprocessor(IndexRegionObserver.class.getName(), null,
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+ }
+ LOGGER.info("Loaded IndexRegionObserver
coprocessor on data table "+fullTableName);
+ }
+ if(!dryRun) {
+ admin.modifyTable(dataTableDescBuilder.build());
+ }
+ }
+ for (String indexName : entry.getValue()) {
+ if(!dryRun) {
+ admin.enableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Enabled index table "+ indexName);
+ }
+ if (!dryRun) {
+ admin.enableTable(TableName.valueOf(fullTableName));
+ }
+ LOGGER.info("Enabled data table "+ fullTableName);
+
+ rebuildIndexes(entry, conf);
+ setMajorCompaction(admin, entry);
+ tableMC.clear();
+
+ } catch (IOException | SQLException e) {
+ LOGGER.severe("Something went wrong while executing
upgrade steps "+ e);
+ }
+ });
+ } catch (SQLException e) {
+ LOGGER.severe("Something went wrong in upgradeConsistencyFeature
"+ e);
+ }
+ }
+
+ private static void rollbackConsistencyFeature(HashMap<String,
ArrayList<String>> tablesAndIndexes) {
+ try (Connection conn = DriverManager.getConnection(connectionURL,new
Properties())) {
+ ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+ for (Map.Entry<String, ArrayList<String>> entry :
tablesAndIndexes.entrySet()) {
+ String fullTableName = entry.getKey();
+ PTable dataTable = PhoenixRuntime.getTableNoCache(conn,
fullTableName);
+ LOGGER.fine("Executing rollback for "+ fullTableName);
+ boolean mutable = !(dataTable.isImmutableRows());
+ try (Admin admin = queryServices.getAdmin()) {
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(fullTableName));
+ }
+ LOGGER.info("Disabled data table "+ fullTableName);
+
+ TableDescriptorBuilder dataTableDescBuilder =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+ if (mutable) {
+ if
(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
{
+ if (!dryRun) {
+
dataTableDescBuilder.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
+ LOGGER.info("Unloaded IndexRegionObserver
coprocessor on data table "+fullTableName);
+ }
+ if
(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
{
+ if (!dryRun) {
+
dataTableDescBuilder.addCoprocessor(Indexer.class.getName(), null,
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+ }
+ LOGGER.info("Loaded Indexer coprocessor on data
table "+fullTableName);
+ }
+ if (!dryRun) {
+ admin.modifyTable(dataTableDescBuilder.build());
+ }
+ }
+ for (String indexName : entry.getValue()) {
+ if (!dryRun) {
+ admin.disableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Disabled index table "+ indexName);
+ TableDescriptorBuilder indexTableDescBuilder =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+ if
(admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
{
+
indexTableDescBuilder.removeCoprocessor(GlobalIndexChecker.class.getName());
+ LOGGER.info("Unloaded GlobalIndexChecker
coprocessor on index table "+indexName);
+ }
+ if (!dryRun) {
+ admin.modifyTable(indexTableDescBuilder.build());
+ }
+ }
+ for (String indexName : entry.getValue()) {
+ if (!dryRun) {
+ admin.enableTable(TableName.valueOf(indexName));
+ }
+ LOGGER.info("Enabled index table "+ indexName);
+ }
+ if (!dryRun) {
+ admin.enableTable(TableName.valueOf(fullTableName));
+ }
+ LOGGER.info("Enabled data table "+ fullTableName);
+ } catch (IOException | SQLException e) {
+ LOGGER.severe("Something went wrong while rolling back the
feature "+ e);
+ }
+ }
+
+ } catch (SQLException e) {
+ LOGGER.severe("Something went wrong in rollbackConsistencyFeature
"+ e);
+ }
+ }
+
+ private static void checkAndWaitForMajorCompaction(Admin admin,
Map.Entry<String, ArrayList<String>> value) {
+ value.getValue().parallelStream().forEach(table -> {
+ try {
+ CompactionState cs =
admin.getCompactionState(TableName.valueOf(table));
+ while (cs.equals(CompactionState.MAJOR)) {
+ LOGGER.fine("Major compaction state is " + cs.name() +"
for "+table+" sleeping for 5 secs before next check");
+ if (!dryRun) {
+ Thread.sleep(5000);
+ cs =
admin.getCompactionState(TableName.valueOf(table));
+ }
+ }
+ if (!dryRun) {
+ LOGGER.fine("Major compaction state is " + cs.name() + "
for " + table);
+ }
+ } catch (InterruptedException | IOException e) {
+ LOGGER.severe("Something went wrong while checking and waiting
for Major Compaction for table " + table+e);
+ }
+ });
+ }
+
+ private static void resetMajorCompaction(Admin admin, Map.Entry<String,
ArrayList<String>> value) {
+ value.getValue().parallelStream().forEach(table -> {
+ try {
+ tableMC.put(table,
admin.getDescriptor(TableName.valueOf(table)).isCompactionEnabled());
+ if (!dryRun) {
+ TableDescriptorBuilder tdb =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(table)));
+ tdb.setCompactionEnabled(false);
+ admin.modifyTable(tdb.build());
+ }
+ LOGGER.info("Major compaction disabled for "+table);
+
+ } catch (IOException e) {
+ LOGGER.severe("Something went wrong while setting Major
Compaction to false for "+table+" "+e);
+ }
+ });
+ }
+
+ private static void setMajorCompaction(Admin admin, Map.Entry<String,
ArrayList<String>> value) {
+ value.getValue().parallelStream().forEach(table -> {
+ try {
+ if (!dryRun) {
+ TableDescriptorBuilder tdb =
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(table)));
+ tdb.setCompactionEnabled(tableMC.get(table));
+ admin.modifyTable(tdb.build());
+ }
+ LOGGER.info("Major compaction set to original value
{"+tableMC.get(table)+"} for "+table);
+ } catch (IOException e) {
+ LOGGER.severe("Something went wrong while setting Major
Compaction to old value "+table+" "+e);
+ }
+ });
+ }
+
+ private static void rebuildIndexes(Map.Entry<String, ArrayList<String>>
value, Configuration conf) {
+ String schema = value.getKey().split("\\.")[0];
+ String table = value.getKey().split("\\.")[1];
+
+ value.getValue().stream().forEach(index -> {
Review comment:
@swaroopak - unfortunately, we don't allow JDK 8 and up features like
stream() in Phoenix 4.x, because HBase 1.x doesn't allow them. Since this PR is
bound for master, which is 5.x, it's technically OK here, but you'd have to
rewrite this for the backport, which is probably more trouble than we want to
maintain.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services