priyankporwal commented on a change in pull request #520: PHOENIX-5333: A tool 
to upgrade existing tables/indexes to use self-c…
URL: https://github.com/apache/phoenix/pull/520#discussion_r295509686
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
 ##########
 @@ -0,0 +1,459 @@
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.FileHandler;
+import java.util.logging.SimpleFormatter;
+
+
+public class IndexUpgradeTool extends Configured {
+
+    private static final Logger LOGGER = 
Logger.getLogger(IndexUpgradeTool.class.getName());
+
+    private static final Option OPERATION_OPTION = new Option("o", 
"operation", true,
+            "[Required]Operation to perform (upgrade/rollback)");
+    private static final Option TABLE_OPTION = new Option("tb", "table", true,
+            "[Required]Tables and indexes list ex. table1: index1, index2, 
index3; table2: index4, index5");
+    private static final Option TABLE_CSV_FILE_OPTION = new Option("f", 
"file", true,
+            "[Optional]Tables and indexes list in a csv file");
+    private static final Option DRY_RUN_OPTION = new Option("s", "show", false,
+            "[Optional]If passed this will output steps that will be 
executed");
+    private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+    private static final Option CONNECTION_STRING_OPTION = new Option("c", 
"connection-string", true, "Connection string");
+    private static final Option LOG_FILE_OPTION = new Option("lf", "logfile", 
true,
+            "Log file path where the logs are written");
+    private static final Option HOST_OPTION = new Option("ho", "host", true, 
"[Required]zookeeper quorum host");
+    private static final Option PORT_OPTION = new Option("po", "port", true, 
"[Required]zookeeper port");
+
+    private static final String UPGRADE_OP = "upgrade";
+    private static final String ROLLBACK_OP = "rollback";
+
+
+    private static HashMap <String, Boolean> tableMC = new HashMap<>();
+    private static Map<String, String> prop = new  HashMap<>();
+
+    private static String connectionURL;
+    private static boolean dryRun;
+
+    public static void main (String[] args) {
+        CommandLine cmdLine = null;
+        try {
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            String operation = 
cmdLine.getOptionValue(OPERATION_OPTION.getOpt());
+            String tables = cmdLine.getOptionValue(TABLE_OPTION.getOpt());
+            String host = cmdLine.getOptionValue(HOST_OPTION.getOpt());
+            String port = cmdLine.getOptionValue(PORT_OPTION.getOpt());
+            dryRun = 
Boolean.valueOf(cmdLine.getOptionValue(DRY_RUN_OPTION.getOpt()));
+
+            String logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt());
+            if (logFile != null) {
+                FileHandler fh = new FileHandler(logFile);
+                fh.setFormatter(new SimpleFormatter());
+                LOGGER.addHandler(fh);
+            }
+            if (dryRun) {
+                LOGGER.info("This is the beginning of the tool with dry run.");
+            }
+            LOGGER.info("Index Upgrade tool initiated: "+ String.join(",", 
args));
+
+            prop.put(Indexer.INDEX_BUILDER_CONF_KEY, 
PhoenixIndexBuilder.class.getName());
+            prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
+
+            connectionURL = 
cmdLine.getOptionValue(CONNECTION_STRING_OPTION.getOpt());
+
+            if (connectionURL == null) {
+                connectionURL = "jdbc:phoenix:"+host+":"+port;
+            }
+
+            if (tables == null) {
+                String tablesCsv = 
cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt());
+                tables = new String(Files.readAllBytes(Paths.get(tablesCsv)));
+            }
+            LOGGER.info("list of tables/indexes passed: "+ tables);
+
+            if (tables == null) {
+                LOGGER.severe("tables' list is not available; use -tb or -f 
option");
+                System.exit(1);
+            }
+
+            HashMap<String, ArrayList<String>> tablesAndIndexes = 
getTablesAndIndexes(tables);
+
+            if (operation.equalsIgnoreCase(UPGRADE_OP)) {
+                LOGGER.info("Executing upgrade");
+                upgradeConsistencyFeature(tablesAndIndexes);
+            } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) {
+                LOGGER.info("Executing rollback");
+                rollbackConsistencyFeature(tablesAndIndexes);
+            } else {
+                throw new IllegalStateException("Invalid option provided for 
"+OPERATION_OPTION.getOpt()+" expected values: {upgrade, rollback}");
+            }
+
+        } catch (IOException e) {
+            LOGGER.severe("Something went wrong "+e);
+        }
+    }
+
+    private static void upgradeConsistencyFeature(HashMap<String, 
ArrayList<String>> tablesAndIndexes) {
+
+        try {
+            Connection conn = DriverManager.getConnection(connectionURL, new 
Properties());
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            Configuration conf = queryServices.getConfiguration();
+            tablesAndIndexes.entrySet().parallelStream().forEach(entry -> {
+
+                try (Admin admin = queryServices.getAdmin()) {
+
+                    String fullTableName = entry.getKey();
+                    PTable dataTable = PhoenixRuntime.getTableNoCache(conn, 
fullTableName);
+                    LOGGER.fine("Executing upgrade for "+ fullTableName);
+
+                    boolean mutable = !(dataTable.isImmutableRows());
+
+                    checkAndWaitForMajorCompaction(admin, entry);
+
+                    resetMajorCompaction(admin, entry);
+                    if (!dryRun) {
+                        admin.disableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Disabled data table "+ fullTableName);
+
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.disableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Disabled index table "+ indexName);
+                        TableDescriptorBuilder indexTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+                        if 
(!admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
 {
+
+                            if (!dryRun) {
+                                
indexTableDescBuilder.addCoprocessor(GlobalIndexChecker.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded GlobalIndexChecker coprocessor 
on index table "+indexName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(indexTableDescBuilder.build());
+                        }
+                    }
+                    if (mutable) {
+                        LOGGER.fine(fullTableName+" is mutable");
+                        TableDescriptorBuilder dataTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+                        
if(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.removeCoprocessor(Indexer.class.getName());
+                            }
+                            LOGGER.info("Unloaded Indexer coprocessor on data 
table "+fullTableName);
+                        }
+                        
if(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
 {
+                            if(!dryRun) {
+                                
dataTableDescBuilder.addCoprocessor(IndexRegionObserver.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded IndexRegionObserver 
coprocessor on data table "+fullTableName);
+                        }
+                        if(!dryRun) {
+                            admin.modifyTable(dataTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if(!dryRun) {
+                            admin.enableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Enabled index table "+ indexName);
+                    }
+                    if (!dryRun) {
+                        admin.enableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Enabled data table "+ fullTableName);
+
+                    rebuildIndexes(entry, conf);
+                    setMajorCompaction(admin, entry);
+                    tableMC.clear();
+
+                } catch (IOException | SQLException e) {
+                    LOGGER.severe("Something went wrong while executing 
upgrade steps "+ e);
+                }
+            });
+        } catch (SQLException e) {
+            LOGGER.severe("Something went wrong in upgradeConsistencyFeature 
"+ e);
+        }
+    }
+
+    private static void rollbackConsistencyFeature(HashMap<String, 
ArrayList<String>> tablesAndIndexes) {
+        try (Connection conn = DriverManager.getConnection(connectionURL,new 
Properties())) {
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+            for (Map.Entry<String, ArrayList<String>> entry : 
tablesAndIndexes.entrySet()) {
+                String fullTableName = entry.getKey();
+                PTable dataTable = PhoenixRuntime.getTableNoCache(conn, 
fullTableName);
+                LOGGER.fine("Executing rollback for "+ fullTableName);
+                boolean mutable = !(dataTable.isImmutableRows());
+                try (Admin admin = queryServices.getAdmin()) {
+                    if (!dryRun) {
+                        admin.disableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Disabled data table "+ fullTableName);
+
+                    TableDescriptorBuilder dataTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+                    if (mutable) {
+                        if 
(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.removeCoprocessor(IndexRegionObserver.class.getName());
+                            }
+                            LOGGER.info("Unloaded IndexRegionObserver 
coprocessor on data table "+fullTableName);
+                        }
+                        if 
(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.addCoprocessor(Indexer.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded Indexer coprocessor on data 
table "+fullTableName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(dataTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.disableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Disabled index table "+ indexName);
+                        TableDescriptorBuilder indexTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+                        if 
(admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
 {
+                            
indexTableDescBuilder.removeCoprocessor(GlobalIndexChecker.class.getName());
+                            LOGGER.info("Unloaded GlobalIndexChecker 
coprocessor on index table "+indexName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(indexTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.enableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Enabled index table "+ indexName);
+                    }
+                    if (!dryRun) {
+                        admin.enableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Enabled data table "+ fullTableName);
+                } catch (IOException | SQLException e) {
+                    LOGGER.severe("Something went wrong while rolling back the 
feature "+ e);
+                }
+            }
+
+        } catch (SQLException e) {
+            LOGGER.severe("Something went wrong in rollbackConsistencyFeature 
"+ e);
+        }
+    }
+
+    private static void checkAndWaitForMajorCompaction(Admin admin, 
Map.Entry<String, ArrayList<String>> value) {
+        value.getValue().parallelStream().forEach(table -> {
+            try {
+                CompactionState cs = 
admin.getCompactionState(TableName.valueOf(table));
 
 Review comment:
   Does this return MAJOR when any of the table regions is running Major 
compaction? What happens when some are running Minor compaction and some are 
running Major compaction?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to