gokceni commented on a change in pull request #520: PHOENIX-5333: A tool to upgrade existing tables/indexes to use self-c… URL: https://github.com/apache/phoenix/pull/520#discussion_r297353279
########## File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.CompactionState; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; +import org.apache.phoenix.index.GlobalIndexChecker; +import org.apache.phoenix.index.PhoenixIndexBuilder; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.PhoenixRuntime; + +import java.util.logging.Logger; +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.logging.FileHandler; +import java.util.logging.SimpleFormatter; + +public class IndexUpgradeTool extends Configured { + + private static final Logger LOGGER = Logger.getLogger(IndexUpgradeTool.class.getName()); + + private static final Option OPERATION_OPTION = new Option("o", "operation", + true, + "[Required]Operation to perform (upgrade/rollback)"); + private static final Option TABLE_OPTION = new Option("tb", "table", true, + "[Required]Tables and indexes list ex. table1:index1,index2,index3;" + + "table2:index4,index5"); + private static final Option TABLE_CSV_FILE_OPTION = new Option("f", "file", + true, + "[Optional]Tables and indexes list in a csv file"); + private static final Option DRY_RUN_OPTION = new Option("d", "dry-run", + false, + "[Optional]If passed this will output steps that will be executed"); + private static final Option HELP_OPTION = new Option("h", "help", + false, "Help"); + private static final Option LOG_FILE_OPTION = new Option("lf", "logfile", + true, + "Log file path where the logs are written"); + private static final Option HOST_OPTION = new Option("ho", "host", + true, "[Required]zookeeper quorum host"); + private static final Option PORT_OPTION = new Option("po", "port", + true, "[Required]zookeeper port"); + private static final Option COMPACTION_OPTION = new Option("c", "compaction", + true, "[Optional]enable/disable compaction will be handled by the tool"); + + public static final String UPGRADE_OP = "upgrade"; + public static final String ROLLBACK_OP = "rollback"; + + + private HashMap <String, Boolean> tableMC = new HashMap<>(); + private HashMap<String, ArrayList<String>> tablesAndIndexes; + private HashMap<String, String> prop = new HashMap<>(); + + private boolean dryRun, upgrade, compaction; + private String connectionURL; + private String operation; + private String inputTables; + private String logFile; + private String inputFile; + + public void setDryRun(boolean dryRun) { + this.dryRun = dryRun; + } + + public void setInputTables(String inputTables) { + this.inputTables = inputTables; + } + + public void setLogFile(String logFile) { + this.logFile = logFile; + } + + public void setInputFile(String inputFile) { + this.inputFile = inputFile; + } + + public boolean getDryRun() { + return this.dryRun; + } + + public String getInputTables() { + return this.inputTables; + } + + public String getLogFile() { + return this.logFile; + } + + public String getOperation() { + return operation; + } + + public String getConnectionURL() { + return connectionURL; + } + + public boolean getCompaction() { + return compaction; + } + + public static void main (String[] args) { + CommandLine cmdLine = null; + + IndexUpgradeTool iut = new IndexUpgradeTool(); + try { + cmdLine = iut.parseOptions(args); + LOGGER.info("Index Upgrade tool initiated: "+ String.join(",", args)); + } catch (IllegalStateException e) { + iut.printHelpAndExit(e.getMessage(), iut.getOptions()); + } + iut.initializeTool(cmdLine); + iut.prepareToolSetup(); + iut.executeTool(); + } + + public IndexUpgradeTool(String mode, String tables, String inputFile, String host, String port, + String outputFile, boolean dryRun, boolean compaction) { + this.operation = mode; + this.inputTables = tables; + this.inputFile = inputFile; + this.connectionURL = "jdbc:phoenix:" + host + ":" + port; + this.logFile = outputFile; + this.dryRun = dryRun; + this.compaction = compaction; + } + + public IndexUpgradeTool () { } + + /** + * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are + * missing. + * @param args supplied command line arguments + * @return the parsed command line + */ + @VisibleForTesting + public CommandLine parseOptions(String[] args) { + + final Options options = getOptions(); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("severe parsing command line options: " + e.getMessage(), + options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + + if (!cmdLine.hasOption(OPERATION_OPTION.getOpt())) { + throw new IllegalStateException(OPERATION_OPTION.getLongOpt() + +" is a mandatory parameter"); + } + + if (cmdLine.hasOption(DRY_RUN_OPTION.getOpt()) + && !cmdLine.hasOption(LOG_FILE_OPTION.getOpt())) { + throw new IllegalStateException("Log file with "+TABLE_OPTION.getLongOpt() + + " is mandatory if " + DRY_RUN_OPTION.getLongOpt() +" is passed"); + } + + if (!(cmdLine.hasOption(TABLE_OPTION.getOpt())) + && !(cmdLine.hasOption(TABLE_CSV_FILE_OPTION.getOpt()))) { + throw new IllegalStateException("Tables and indexes list should be passed in either with" + +TABLE_OPTION.getLongOpt() + " or " + TABLE_CSV_FILE_OPTION.getLongOpt()); + } + if (!((cmdLine.hasOption(HOST_OPTION.getOpt())) + && (cmdLine.hasOption(PORT_OPTION.getOpt())))) { + throw new IllegalStateException(HOST_OPTION.getLongOpt() +" and " + + PORT_OPTION.getLongOpt() +" are mandatory parameters"); + } + return cmdLine; + } + + private void printHelpAndExit(String severeMessage, Options options) { + System.err.println(severeMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + private Options getOptions() { + final Options options = new Options(); + options.addOption(OPERATION_OPTION); + TABLE_OPTION.setOptionalArg(true); + options.addOption(TABLE_OPTION); + TABLE_CSV_FILE_OPTION.setOptionalArg(true); + options.addOption(TABLE_CSV_FILE_OPTION); + DRY_RUN_OPTION.setOptionalArg(true); + options.addOption(DRY_RUN_OPTION); + LOG_FILE_OPTION.setOptionalArg(true); + options.addOption(LOG_FILE_OPTION); + options.addOption(HELP_OPTION); + options.addOption(HOST_OPTION); + options.addOption(PORT_OPTION); + COMPACTION_OPTION.setOptionalArg(true); + options.addOption(COMPACTION_OPTION); + return options; + } + + @VisibleForTesting + public void initializeTool(CommandLine cmdLine) { + operation = cmdLine.getOptionValue(OPERATION_OPTION.getOpt()); + inputTables = cmdLine.getOptionValue(TABLE_OPTION.getOpt()); + connectionURL = "jdbc:phoenix:" + cmdLine.getOptionValue(HOST_OPTION.getOpt()) +":" + + cmdLine.getOptionValue(PORT_OPTION.getOpt()); + logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt()); + inputFile = cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt()); + dryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + compaction = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + } + + @VisibleForTesting + public void prepareToolSetup() { + try { + if (logFile != null) { + FileHandler fh = new FileHandler(logFile); + fh.setFormatter(new SimpleFormatter()); + LOGGER.addHandler(fh); + } + + prop.put(Indexer.INDEX_BUILDER_CONF_KEY, PhoenixIndexBuilder.class.getName()); + prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + + if (inputTables == null) { + inputTables = new String(Files.readAllBytes(Paths.get(inputFile))); + } + if (inputTables == null) { + LOGGER.severe("tables' list is not available; use -tb or -f option"); + System.exit(1); + } + LOGGER.info("list of tables/indexes passed: " + inputTables); + + tablesAndIndexes = extractTablesAndIndexes(inputTables); + + if (operation.equalsIgnoreCase(UPGRADE_OP)) { + upgrade = true; + } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) { + upgrade = false; + } else { + throw new IllegalStateException("Invalid option provided for " + + OPERATION_OPTION.getOpt() + " expected values: {upgrade, rollback}"); + } + if (dryRun) { + LOGGER.info("This is the beginning of the tool with dry run."); + } + } catch (IOException e) { + LOGGER.severe("Something went wrong "+e); + } + } + + @VisibleForTesting + public void executeTool() { + try (Connection conn = DriverManager.getConnection(connectionURL, new Properties())) { + ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class) + .getQueryServices(); + Configuration conf = queryServices.getConfiguration(); + executeTool(conn, queryServices, conf); + } catch (SQLException e) { + LOGGER.severe("Something went wrong in executing tool "+ e); + } + } + + private void executeTool(Connection conn, ConnectionQueryServices queryServices, + Configuration conf) { + + LOGGER.info("Executing "+ operation); + + for (Map.Entry<String, ArrayList<String>> entry: tablesAndIndexes.entrySet()) { + String dataTableFullName = entry.getKey(); + ArrayList<String> indexes = entry.getValue(); + + try (Admin admin = queryServices.getAdmin()) { + + PTable dataTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); + LOGGER.fine("Executing "+operation+" for " + dataTableFullName); + + boolean mutable = !(dataTable.isImmutableRows()); + + if (upgrade) { + if (compaction) { + setCompactionEnabled(admin, entry, false); + checkAndWaitForMajorCompaction(admin, entry); + } + disableTable(admin, entry); + modifyIndexTable(admin, indexes); + if (mutable) { + modifyDataTable(admin, dataTableFullName); + } + enableTable(admin, entry); + rebuildIndexes(entry, conf); + if (compaction) { + setCompactionEnabled(admin, entry, true); + } + } else { + disableTable(admin, entry); + if(mutable) { + modifyDataTable(admin, dataTableFullName); + } + modifyIndexTable(admin, indexes); Review comment: Could you clean IndexNameGlobalIndexCheckerEnabledMap at this point? My PR is introducing it. It is in indexutil ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
