[ https://issues.apache.org/jira/browse/PHOENIX-2890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585161#comment-15585161 ]
ASF GitHub Bot commented on PHOENIX-2890: ----------------------------------------- Github user chrajeshbabu commented on a diff in the pull request: https://github.com/apache/phoenix/pull/210#discussion_r83827883 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java --- @@ -167,50 +180,152 @@ private void printHelpAndExit(Options options, int exitCode) { formatter.printHelp("help", options); System.exit(exitCode); } + + class JobFactory { + Connection connection; + Configuration configuration; + private Path outputPath; - @Override - public int run(String[] args) throws Exception { - Connection connection = null; - try { - CommandLine cmdLine = null; - try { - cmdLine = parseOptions(args); - } catch (IllegalStateException e) { - printHelpAndExit(e.getMessage(), getOptions()); + public JobFactory(Connection connection, Configuration configuration, Path outputPath) { + this.connection = connection; + this.configuration = configuration; + this.outputPath = outputPath; + + } + + public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi) throws Exception { + if (indexTable == null) { + return configureJobForPartialBuild(schemaName, dataTable); + } else { + return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi); } - final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); - final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); - final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); - final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + } + + private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception { final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final String qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); - + final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); connection = ConnectionUtil.getInputConnection(configuration); - if (!isValidIndexTable(connection, qDataTable, indexTable)) { - throw new IllegalArgumentException(String.format( - " %s is not an index table for %s ", qIndexTable, qDataTable)); + long minDisableTimestamp = HConstants.LATEST_TIMESTAMP; + PTable indexWithMinDisableTimestamp = null; + + //Get Indexes in building state, minDisabledTimestamp + List<String> disableIndexes = new ArrayList<String>(); + List<PTable> disabledPIndexes = new ArrayList<PTable>(); + for (PTable index : pdataTable.getIndexes()) { + if (index.getIndexState().equals(PIndexState.BUILDING)) { + disableIndexes.add(index.getTableName().getString()); + disabledPIndexes.add(index); + if (minDisableTimestamp > index.getIndexDisableTimestamp()) { + minDisableTimestamp = index.getIndexDisableTimestamp(); + indexWithMinDisableTimestamp = index; + } + } + } + + if (indexWithMinDisableTimestamp == null) { + throw new Exception("There is no index for a datatable to be rebuild:" + qDataTable); } + if (minDisableTimestamp == 0) { + throw new Exception("It seems Index " + indexWithMinDisableTimestamp + + " has disable timestamp as 0 , please run IndexTool with IndexName to build it first"); + // TODO probably we can initiate the job by ourself or can skip them while making the list for partial build with a warning + } + + long maxTimestamp = getMaxRebuildAsyncDate(schemaName, disableIndexes); + + //serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf. + List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size()); + for (PTable index : disabledPIndexes) { + maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class))); + } + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); + PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); + + //Prepare raw scan + Scan scan = IndexManagementUtil.newLocalStateScan(maintainers); + scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp); + scan.setRaw(true); + scan.setCacheBlocks(false); + if (pdataTable.isTransactional()) { + long maxTimeRange = pdataTable.getTimeStamp() + 1; + scan.setAttribute(BaseScannerRegionObserver.TX_SCN, + Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))))); + } + + + String physicalTableName=pdataTable.getPhysicalName().getString(); + final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString()); + + PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable); + PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName); + + //TODO: update disable indexes + PhoenixConfigurationUtil.setDisableIndexes(configuration, StringUtils.join(",",disableIndexes)); + final Job job = Job.getInstance(configuration, jobName); + job.setJarByClass(IndexTool.class); + FileOutputFormat.setOutputPath(job, outputPath); + TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, PhoenixIndexPartialBuildMapper.class, null, + null, job); + TableMapReduceUtil.initCredentials(job); + TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName)); + return configureSubmittableJobUsingDirectApi(job, true); + } + + private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException { + Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP; + Long maxDisabledTimeStamp=0L; + if (disableIndexes == null || disableIndexes.isEmpty()) { return 0; } + List<String> quotedIndexes = new ArrayList<String>(disableIndexes.size()); + for (String index : disableIndexes) { + quotedIndexes.add("'" + index + "'"); + } + ResultSet rs = connection.createStatement() + .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " (" + + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM + + (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL") + + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")"); + if (rs.next()) { + maxRebuilAsyncDate = rs.getLong(1); + maxDisabledTimeStamp = rs.getLong(2); + } + // Do check if table is disabled again after user invoked async rebuilding during the run of the job --- End diff -- Make sure this should not happen if that's the case we may lose some data. > Extend IndexTool to allow incremental index rebuilds > ---------------------------------------------------- > > Key: PHOENIX-2890 > URL: https://issues.apache.org/jira/browse/PHOENIX-2890 > Project: Phoenix > Issue Type: Improvement > Reporter: Ankit Singhal > Assignee: Ankit Singhal > Priority: Minor > Fix For: 4.9.0 > > Attachments: PHOENIX-2890.patch, PHOENIX-2890_wip.patch > > > Currently , IndexTool is used for initial index rebuild but I think we should > extend it to be used for recovering index from last disabled timestamp too. > In general terms if we run IndexTool on already existing/new index, then it > should follow the same semantics as followed by background Index rebuilding > thread. -- This message was sent by Atlassian JIRA (v6.3.4#6332)