[ 
https://issues.apache.org/jira/browse/PHOENIX-2890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585161#comment-15585161
 ] 

ASF GitHub Bot commented on PHOENIX-2890:
-----------------------------------------

Github user chrajeshbabu commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/210#discussion_r83827883
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---
    @@ -167,50 +180,152 @@ private void printHelpAndExit(Options options, int 
exitCode) {
             formatter.printHelp("help", options);
             System.exit(exitCode);
         }
    +    
    +    class JobFactory {
    +        Connection connection;
    +        Configuration configuration;
    +        private Path outputPath;
     
    -    @Override
    -    public int run(String[] args) throws Exception {
    -        Connection connection = null;
    -        try {
    -            CommandLine cmdLine = null;
    -            try {
    -                cmdLine = parseOptions(args);
    -            } catch (IllegalStateException e) {
    -                printHelpAndExit(e.getMessage(), getOptions());
    +        public JobFactory(Connection connection, Configuration 
configuration, Path outputPath) {
    +            this.connection = connection;
    +            this.configuration = configuration;
    +            this.outputPath = outputPath;
    +
    +        }
    +
    +        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi) throws Exception {
    +            if (indexTable == null) {
    +                return configureJobForPartialBuild(schemaName, dataTable);
    +            } else {
    +                return configureJobForAysncIndex(schemaName, indexTable, 
dataTable, useDirectApi);
                 }
    -            final Configuration configuration = 
HBaseConfiguration.addHbaseResources(getConf());
    -            final String schemaName = 
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
    -            final String dataTable = 
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
    -            final String indexTable = 
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
    +        }
    +        
    +        private Job configureJobForPartialBuild(String schemaName, String 
dataTable) throws Exception {
                 final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
    -            final String qIndexTable = 
SchemaUtil.getQualifiedTableName(schemaName, indexTable);
    -
    +            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
                 connection = ConnectionUtil.getInputConnection(configuration);
    -            if (!isValidIndexTable(connection, qDataTable, indexTable)) {
    -                throw new IllegalArgumentException(String.format(
    -                    " %s is not an index table for %s ", qIndexTable, 
qDataTable));
    +            long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
    +            PTable indexWithMinDisableTimestamp = null;
    +            
    +            //Get Indexes in building state, minDisabledTimestamp 
    +            List<String> disableIndexes = new ArrayList<String>();
    +            List<PTable> disabledPIndexes = new ArrayList<PTable>();
    +            for (PTable index : pdataTable.getIndexes()) {
    +                if (index.getIndexState().equals(PIndexState.BUILDING)) {
    +                    disableIndexes.add(index.getTableName().getString());
    +                    disabledPIndexes.add(index);
    +                    if (minDisableTimestamp > 
index.getIndexDisableTimestamp()) {
    +                        minDisableTimestamp = 
index.getIndexDisableTimestamp();
    +                        indexWithMinDisableTimestamp = index;
    +                    }
    +                }
    +            }
    +            
    +            if (indexWithMinDisableTimestamp == null) {
    +                throw new Exception("There is no index for a datatable to 
be rebuild:" + qDataTable);
                 }
    +            if (minDisableTimestamp == 0) {
    +                throw new Exception("It seems Index " + 
indexWithMinDisableTimestamp
    +                        + " has disable timestamp as 0 , please run 
IndexTool with IndexName to build it first");
    +                // TODO probably we can initiate the job by ourself or can 
skip them while making the list for partial build with a warning
    +            }
    +            
    +            long maxTimestamp = getMaxRebuildAsyncDate(schemaName, 
disableIndexes);
    +            
    +            //serialize index maintaienr in job conf with Base64 TODO: 
Need to find better way to serialize them in conf.
    +            List<IndexMaintainer> maintainers = 
Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
    +            for (PTable index : disabledPIndexes) {
    +                maintainers.add(index.getIndexMaintainer(pdataTable, 
connection.unwrap(PhoenixConnection.class)));
    +            }
    +            ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
    +            IndexMaintainer.serializeAdditional(pdataTable, 
indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
    +            PhoenixConfigurationUtil.setIndexMaintainers(configuration, 
indexMetaDataPtr);
    +            
    +            //Prepare raw scan 
    +            Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
    +            scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
    +            scan.setRaw(true);
    +            scan.setCacheBlocks(false);
    +            if (pdataTable.isTransactional()) {
    +                long maxTimeRange = pdataTable.getTimeStamp() + 1;
    +                scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
    +                        
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
    +            }
    +            
    +          
    +            String 
physicalTableName=pdataTable.getPhysicalName().getString();
    +            final String jobName = String.format("Phoenix Indexes build 
for " + pdataTable.getName().toString());
    +            
    +            PhoenixConfigurationUtil.setInputTableName(configuration, 
qDataTable);
    +            PhoenixConfigurationUtil.setPhysicalTableName(configuration, 
physicalTableName);
    +            
    +            //TODO: update disable indexes
    +            PhoenixConfigurationUtil.setDisableIndexes(configuration, 
StringUtils.join(",",disableIndexes));
    +            final Job job = Job.getInstance(configuration, jobName);
    +            job.setJarByClass(IndexTool.class);
    +            FileOutputFormat.setOutputPath(job, outputPath);
    +            TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, 
PhoenixIndexPartialBuildMapper.class, null,
    +                    null, job);
    +            TableMapReduceUtil.initCredentials(job);
    +            TableInputFormat.configureSplitTable(job, 
TableName.valueOf(physicalTableName));
    +            return configureSubmittableJobUsingDirectApi(job, true);
    +        }
    +        
    +        private long getMaxRebuildAsyncDate(String schemaName, 
List<String> disableIndexes) throws SQLException {
    +            Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP;
    +            Long maxDisabledTimeStamp=0L;
    +            if (disableIndexes == null || disableIndexes.isEmpty()) { 
return 0; }
    +            List<String> quotedIndexes = new 
ArrayList<String>(disableIndexes.size());
    +            for (String index : disableIndexes) {
    +                quotedIndexes.add("'" + index + "'");
    +            }
    +            ResultSet rs = connection.createStatement()
    +                    .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP 
+ "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
    +                            + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " 
+ TABLE_SCHEM
    +                            + (schemaName != null && schemaName.length() > 
0 ? "='" + schemaName + "'" : " IS NULL")
    +                            + " and " + TABLE_NAME + " IN (" + 
StringUtils.join(",", quotedIndexes) + ")");
    +            if (rs.next()) {
    +                maxRebuilAsyncDate = rs.getLong(1);
    +                maxDisabledTimeStamp = rs.getLong(2);
    +            }
    +            // Do check if table is disabled again after user invoked 
async rebuilding during the run of the job
    --- End diff --
    
    Make sure this should not happen if that's the case we may lose some data.


> Extend IndexTool to allow incremental index rebuilds
> ----------------------------------------------------
>
>                 Key: PHOENIX-2890
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2890
>             Project: Phoenix
>          Issue Type: Improvement
>            Reporter: Ankit Singhal
>            Assignee: Ankit Singhal
>            Priority: Minor
>             Fix For: 4.9.0
>
>         Attachments: PHOENIX-2890.patch, PHOENIX-2890_wip.patch
>
>
> Currently , IndexTool is used for initial index rebuild but I think we should 
> extend it to be used for recovering index from last disabled timestamp too. 
> In general terms if we run IndexTool on already existing/new index, then it 
> should follow the same semantics as followed by background Index rebuilding 
> thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to