Github user chrajeshbabu commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/210#discussion_r83827883
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---
    @@ -167,50 +180,152 @@ private void printHelpAndExit(Options options, int 
exitCode) {
             formatter.printHelp("help", options);
             System.exit(exitCode);
         }
    +    
    +    class JobFactory {
    +        Connection connection;
    +        Configuration configuration;
    +        private Path outputPath;
     
    -    @Override
    -    public int run(String[] args) throws Exception {
    -        Connection connection = null;
    -        try {
    -            CommandLine cmdLine = null;
    -            try {
    -                cmdLine = parseOptions(args);
    -            } catch (IllegalStateException e) {
    -                printHelpAndExit(e.getMessage(), getOptions());
    +        public JobFactory(Connection connection, Configuration 
configuration, Path outputPath) {
    +            this.connection = connection;
    +            this.configuration = configuration;
    +            this.outputPath = outputPath;
    +
    +        }
    +
    +        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi) throws Exception {
    +            if (indexTable == null) {
    +                return configureJobForPartialBuild(schemaName, dataTable);
    +            } else {
    +                return configureJobForAysncIndex(schemaName, indexTable, 
dataTable, useDirectApi);
                 }
    -            final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
    -            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
    -            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
    -            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
    +        }
    +        
    +        private Job configureJobForPartialBuild(String schemaName, String 
dataTable) throws Exception {
                 final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
    -            final String qIndexTable = 
SchemaUtil.getQualifiedTableName(schemaName, indexTable);
    -
    +            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
                 connection = ConnectionUtil.getInputConnection(configuration);
    -            if (!isValidIndexTable(connection, qDataTable, indexTable)) {
    -                throw new IllegalArgumentException(String.format(
    -                    " %s is not an index table for %s ", qIndexTable, 
qDataTable));
    +            long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
    +            PTable indexWithMinDisableTimestamp = null;
    +            
    +            //Get Indexes in building state, minDisabledTimestamp 
    +            List<String> disableIndexes = new ArrayList<String>();
    +            List<PTable> disabledPIndexes = new ArrayList<PTable>();
    +            for (PTable index : pdataTable.getIndexes()) {
    +                if (index.getIndexState().equals(PIndexState.BUILDING)) {
    +                    disableIndexes.add(index.getTableName().getString());
    +                    disabledPIndexes.add(index);
    +                    if (minDisableTimestamp > 
index.getIndexDisableTimestamp()) {
    +                        minDisableTimestamp = 
index.getIndexDisableTimestamp();
    +                        indexWithMinDisableTimestamp = index;
    +                    }
    +                }
    +            }
    +            
    +            if (indexWithMinDisableTimestamp == null) {
    +                throw new Exception("There is no index for a datatable to 
be rebuild:" + qDataTable);
                 }
    +            if (minDisableTimestamp == 0) {
    +                throw new Exception("It seems Index " + 
indexWithMinDisableTimestamp
    +                        + " has disable timestamp as 0 , please run 
IndexTool with IndexName to build it first");
    +                // TODO probably we can initiate the job by ourself or can 
skip them while making the list for partial build with a warning
    +            }
    +            
    +            long maxTimestamp = getMaxRebuildAsyncDate(schemaName, 
disableIndexes);
    +            
    +            //serialize index maintaienr in job conf with Base64 TODO: 
Need to find better way to serialize them in conf.
    +            List<IndexMaintainer> maintainers = 
Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
    +            for (PTable index : disabledPIndexes) {
    +                maintainers.add(index.getIndexMaintainer(pdataTable, 
connection.unwrap(PhoenixConnection.class)));
    +            }
    +            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
    +            IndexMaintainer.serializeAdditional(pdataTable, 
indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
    +            PhoenixConfigurationUtil.setIndexMaintainers(configuration, 
indexMetaDataPtr);
    +            
    +            //Prepare raw scan 
    +            Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
    +            scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
    +            scan.setRaw(true);
    +            scan.setCacheBlocks(false);
    +            if (pdataTable.isTransactional()) {
    +                long maxTimeRange = pdataTable.getTimeStamp() + 1;
    +                scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
    +                        
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
    +            }
    +            
    +          
    +            String 
physicalTableName=pdataTable.getPhysicalName().getString();
    +            final String jobName = String.format("Phoenix Indexes build 
for " + pdataTable.getName().toString());
    +            
    +            PhoenixConfigurationUtil.setInputTableName(configuration, 
qDataTable);
    +            PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalTableName);
    +            
    +            //TODO: update disable indexes
    +            PhoenixConfigurationUtil.setDisableIndexes(configuration, 
StringUtils.join(",",disableIndexes));
    +            final Job job = Job.getInstance(configuration, jobName);
    +            job.setJarByClass(IndexTool.class);
    +            FileOutputFormat.setOutputPath(job, outputPath);
    +            TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, 
PhoenixIndexPartialBuildMapper.class, null,
    +                    null, job);
    +            TableMapReduceUtil.initCredentials(job);
    +            TableInputFormat.configureSplitTable(job, 
TableName.valueOf(physicalTableName));
    +            return configureSubmittableJobUsingDirectApi(job, true);
    +        }
    +        
    +        private long getMaxRebuildAsyncDate(String schemaName, 
List<String> disableIndexes) throws SQLException {
    +            Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP;
    +            Long maxDisabledTimeStamp=0L;
    +            if (disableIndexes == null || disableIndexes.isEmpty()) { 
return 0; }
    +            List<String> quotedIndexes = new 
ArrayList<String>(disableIndexes.size());
    +            for (String index : disableIndexes) {
    +                quotedIndexes.add("'" + index + "'");
    +            }
    +            ResultSet rs = connection.createStatement()
    +                    .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP 
+ "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
    +                            + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " 
+ TABLE_SCHEM
    +                            + (schemaName != null && schemaName.length() > 
0 ? "='" + schemaName + "'" : " IS NULL")
    +                            + " and " + TABLE_NAME + " IN (" + 
StringUtils.join(",", quotedIndexes) + ")");
    +            if (rs.next()) {
    +                maxRebuilAsyncDate = rs.getLong(1);
    +                maxDisabledTimeStamp = rs.getLong(2);
    +            }
    +            // Do check if table is disabled again after user invoked 
async rebuilding during the run of the job
    --- End diff --
    
    Make sure this should not happen if that's the case we may lose some data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to