gjacoby126 commented on a change in pull request #520: PHOENIX-5333: A tool to 
upgrade existing tables/indexes to use self-c…
URL: https://github.com/apache/phoenix/pull/520#discussion_r295526141
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
 ##########
 @@ -0,0 +1,456 @@
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.util.logging.Logger;
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.logging.FileHandler;
+import java.util.logging.SimpleFormatter;
+
+
+public class IndexUpgradeTool extends Configured {
+
+    private static final Logger LOGGER = 
Logger.getLogger(IndexUpgradeTool.class.getName());
+
+    private static final Option OPERATION_OPTION = new Option("o", 
"operation", true,
+            "[Required]Operation to perform (upgrade/rollback)");
+    private static final Option TABLE_OPTION = new Option("tb", "table", true,
+            "[Required]Tables and indexes list ex. table1: index1, index2, 
index3; table2: index4, index5");
+    private static final Option TABLE_CSV_FILE_OPTION = new Option("f", 
"file", true,
+            "[Optional]Tables and indexes list in a csv file");
+    private static final Option DRY_RUN_OPTION = new Option("s", "show", false,
+            "[Optional]If passed this will output steps that will be 
executed");
+    private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+    private static final Option LOG_FILE_OPTION = new Option("lf", "logfile", 
true,
+            "Log file path where the logs are written");
+    private static final Option HOST_OPTION = new Option("ho", "host", true, 
"[Required]zookeeper quorum host");
+    private static final Option PORT_OPTION = new Option("po", "port", true, 
"[Required]zookeeper port");
+
+    private static final String UPGRADE_OP = "upgrade";
+    private static final String ROLLBACK_OP = "rollback";
+
+
+    private static HashMap <String, Boolean> tableMC = new HashMap<>();
+    private static Map<String, String> prop = new  HashMap<>();
+
+    private static String connectionURL;
+    private static boolean dryRun;
+
+    public static void main (String[] args) {
+        CommandLine cmdLine = null;
+        try {
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            String operation = 
cmdLine.getOptionValue(OPERATION_OPTION.getOpt());
+            String tables = cmdLine.getOptionValue(TABLE_OPTION.getOpt());
+            String host = cmdLine.getOptionValue(HOST_OPTION.getOpt());
+            String port = cmdLine.getOptionValue(PORT_OPTION.getOpt());
+
+            String logFile = cmdLine.getOptionValue(LOG_FILE_OPTION.getOpt());
+            if (logFile != null) {
+                FileHandler fh = new FileHandler(logFile);
+                fh.setFormatter(new SimpleFormatter());
+                LOGGER.addHandler(fh);
+            }
+            if (dryRun) {
+                LOGGER.info("This is the beginning of the tool with dry run.");
+            }
+            LOGGER.info("Index Upgrade tool initiated: "+ String.join(",", 
args));
+
+            prop.put(Indexer.INDEX_BUILDER_CONF_KEY, 
PhoenixIndexBuilder.class.getName());
+            prop.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
+
+            connectionURL = "jdbc:phoenix:"+host+":"+port;
+
+            if (tables == null) {
+                String tablesCsv = 
cmdLine.getOptionValue(TABLE_CSV_FILE_OPTION.getOpt());
+                tables = new String(Files.readAllBytes(Paths.get(tablesCsv)));
+            }
+            LOGGER.info("list of tables/indexes passed: "+ tables);
+
+            if (tables == null) {
+                LOGGER.severe("tables' list is not available; use -tb or -f 
option");
+                System.exit(1);
+            }
+
+            HashMap<String, ArrayList<String>> tablesAndIndexes = 
getTablesAndIndexes(tables);
+
+            if (operation.equalsIgnoreCase(UPGRADE_OP)) {
+                LOGGER.info("Executing upgrade");
+                upgradeConsistencyFeature(tablesAndIndexes);
+            } else if (operation.equalsIgnoreCase(ROLLBACK_OP)) {
+                LOGGER.info("Executing rollback");
+                rollbackConsistencyFeature(tablesAndIndexes);
+            } else {
+                throw new IllegalStateException("Invalid option provided for 
"+OPERATION_OPTION.getOpt()+" expected values: {upgrade, rollback}");
+            }
+
+        } catch (IOException e) {
+            LOGGER.severe("Something went wrong "+e);
+        }
+    }
+
+    private static void upgradeConsistencyFeature(HashMap<String, 
ArrayList<String>> tablesAndIndexes) {
+
+        try {
+            Connection conn = DriverManager.getConnection(connectionURL, new 
Properties());
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+            Configuration conf = queryServices.getConfiguration();
+            tablesAndIndexes.entrySet().parallelStream().forEach(entry -> {
+
+                try (Admin admin = queryServices.getAdmin()) {
+
+                    String fullTableName = entry.getKey();
+                    PTable dataTable = PhoenixRuntime.getTableNoCache(conn, 
fullTableName);
+                    LOGGER.fine("Executing upgrade for "+ fullTableName);
+
+                    boolean mutable = !(dataTable.isImmutableRows());
+
+                    checkAndWaitForMajorCompaction(admin, entry);
+
+                    resetMajorCompaction(admin, entry);
+                    if (!dryRun) {
+                        admin.disableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Disabled data table "+ fullTableName);
+
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.disableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Disabled index table "+ indexName);
+                        TableDescriptorBuilder indexTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+                        if 
(!admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
 {
+
+                            if (!dryRun) {
+                                
indexTableDescBuilder.addCoprocessor(GlobalIndexChecker.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded GlobalIndexChecker coprocessor 
on index table "+indexName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(indexTableDescBuilder.build());
+                        }
+                    }
+                    if (mutable) {
+                        LOGGER.fine(fullTableName+" is mutable");
+                        TableDescriptorBuilder dataTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+                        
if(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.removeCoprocessor(Indexer.class.getName());
+                            }
+                            LOGGER.info("Unloaded Indexer coprocessor on data 
table "+fullTableName);
+                        }
+                        
if(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
 {
+                            if(!dryRun) {
+                                
dataTableDescBuilder.addCoprocessor(IndexRegionObserver.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded IndexRegionObserver 
coprocessor on data table "+fullTableName);
+                        }
+                        if(!dryRun) {
+                            admin.modifyTable(dataTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if(!dryRun) {
+                            admin.enableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Enabled index table "+ indexName);
+                    }
+                    if (!dryRun) {
+                        admin.enableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Enabled data table "+ fullTableName);
+
+                    rebuildIndexes(entry, conf);
+                    setMajorCompaction(admin, entry);
+                    tableMC.clear();
+
+                } catch (IOException | SQLException e) {
+                    LOGGER.severe("Something went wrong while executing 
upgrade steps "+ e);
+                }
+            });
+        } catch (SQLException e) {
+            LOGGER.severe("Something went wrong in upgradeConsistencyFeature 
"+ e);
+        }
+    }
+
+    private static void rollbackConsistencyFeature(HashMap<String, 
ArrayList<String>> tablesAndIndexes) {
+        try (Connection conn = DriverManager.getConnection(connectionURL,new 
Properties())) {
+            ConnectionQueryServices queryServices = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
+
+            for (Map.Entry<String, ArrayList<String>> entry : 
tablesAndIndexes.entrySet()) {
+                String fullTableName = entry.getKey();
+                PTable dataTable = PhoenixRuntime.getTableNoCache(conn, 
fullTableName);
+                LOGGER.fine("Executing rollback for "+ fullTableName);
+                boolean mutable = !(dataTable.isImmutableRows());
+                try (Admin admin = queryServices.getAdmin()) {
+                    if (!dryRun) {
+                        admin.disableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Disabled data table "+ fullTableName);
+
+                    TableDescriptorBuilder dataTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(fullTableName)));
+                    if (mutable) {
+                        if 
(admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(IndexRegionObserver.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.removeCoprocessor(IndexRegionObserver.class.getName());
+                            }
+                            LOGGER.info("Unloaded IndexRegionObserver 
coprocessor on data table "+fullTableName);
+                        }
+                        if 
(!admin.getDescriptor(TableName.valueOf(fullTableName)).hasCoprocessor(Indexer.class.getName()))
 {
+                            if (!dryRun) {
+                                
dataTableDescBuilder.addCoprocessor(Indexer.class.getName(), null, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY, prop);
+                            }
+                            LOGGER.info("Loaded Indexer coprocessor on data 
table "+fullTableName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(dataTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.disableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Disabled index table "+ indexName);
+                        TableDescriptorBuilder indexTableDescBuilder = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(indexName)));
+                        if 
(admin.getDescriptor(TableName.valueOf(indexName)).hasCoprocessor(GlobalIndexChecker.class.getName()))
 {
+                            
indexTableDescBuilder.removeCoprocessor(GlobalIndexChecker.class.getName());
+                            LOGGER.info("Unloaded GlobalIndexChecker 
coprocessor on index table "+indexName);
+                        }
+                        if (!dryRun) {
+                            admin.modifyTable(indexTableDescBuilder.build());
+                        }
+                    }
+                    for (String indexName : entry.getValue()) {
+                        if (!dryRun) {
+                            admin.enableTable(TableName.valueOf(indexName));
+                        }
+                        LOGGER.info("Enabled index table "+ indexName);
+                    }
+                    if (!dryRun) {
+                        admin.enableTable(TableName.valueOf(fullTableName));
+                    }
+                    LOGGER.info("Enabled data table "+ fullTableName);
+                } catch (IOException | SQLException e) {
+                    LOGGER.severe("Something went wrong while rolling back the 
feature "+ e);
+                }
+            }
+
+        } catch (SQLException e) {
+            LOGGER.severe("Something went wrong in rollbackConsistencyFeature 
"+ e);
+        }
+    }
+
+    private static void checkAndWaitForMajorCompaction(Admin admin, 
Map.Entry<String, ArrayList<String>> value) {
+        value.getValue().parallelStream().forEach(table -> {
+            try {
+                CompactionState cs = 
admin.getCompactionState(TableName.valueOf(table));
+                while (cs.equals(CompactionState.MAJOR)) {
+                    LOGGER.fine("Major compaction state is " + cs.name() +" 
for "+table+" sleeping for 5 secs before next check");
+                    if (!dryRun) {
+                        Thread.sleep(5000);
+                        cs = 
admin.getCompactionState(TableName.valueOf(table));
+                    }
+                }
+                if (!dryRun) {
+                    LOGGER.fine("Major compaction state is " + cs.name() + " 
for " + table);
+                }
+            } catch (InterruptedException | IOException e) {
+                LOGGER.severe("Something went wrong while checking and waiting 
for Major Compaction for table " + table+e);
+            }
+        });
+    }
+
+    private static void resetMajorCompaction(Admin admin, Map.Entry<String, 
ArrayList<String>> value) {
+        value.getValue().parallelStream().forEach(table -> {
+            try {
+                tableMC.put(table, 
admin.getDescriptor(TableName.valueOf(table)).isCompactionEnabled());
+                if (!dryRun) {
+                    TableDescriptorBuilder tdb = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(table)));
+                    tdb.setCompactionEnabled(false);
+                    admin.modifyTable(tdb.build());
+                }
+                LOGGER.info("Major compaction disabled for "+table);
+
+            } catch (IOException e) {
+                LOGGER.severe("Something went wrong while setting Major 
Compaction to false for "+table+" "+e);
+            }
+        });
+    }
+
+    private static void setMajorCompaction(Admin admin, Map.Entry<String, 
ArrayList<String>> value) {
+        value.getValue().parallelStream().forEach(table -> {
+            try {
+                if (!dryRun) {
+                    TableDescriptorBuilder tdb = 
TableDescriptorBuilder.newBuilder(admin.getDescriptor(TableName.valueOf(table)));
+                    tdb.setCompactionEnabled(tableMC.get(table));
+                    admin.modifyTable(tdb.build());
+                }
+                LOGGER.info("Major compaction set to original value 
{"+tableMC.get(table)+"} for "+table);
+            } catch (IOException e) {
+                LOGGER.severe("Something went wrong while setting Major 
Compaction to old value "+table+" "+e);
+            }
+        });
+    }
+
+    private static void rebuildIndexes(Map.Entry<String, ArrayList<String>> 
value, Configuration conf) {
+        String schema = value.getKey().split("\\.")[0];
+        String table = value.getKey().split("\\.")[1];
+
+        value.getValue().stream().forEach(index -> {
 
 Review comment:
   @swaroopak - unfortunately, we don't allow JDK 8 and up features like 
stream() in Phoenix 4.x, because HBase 1.x doesn't allow them. Since this PR is 
bound for master, which is 5.x, it's technically OK here, but you'd have to 
rewrite this for the backport, which is probably more trouble than we want to 
maintain. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to