[
https://issues.apache.org/jira/browse/PHOENIX-2890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585161#comment-15585161
]
ASF GitHub Bot commented on PHOENIX-2890:
-----------------------------------------
Github user chrajeshbabu commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/210#discussion_r83827883
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---
@@ -167,50 +180,152 @@ private void printHelpAndExit(Options options, int
exitCode) {
formatter.printHelp("help", options);
System.exit(exitCode);
}
+
+ class JobFactory {
+ Connection connection;
+ Configuration configuration;
+ private Path outputPath;
- @Override
- public int run(String[] args) throws Exception {
- Connection connection = null;
- try {
- CommandLine cmdLine = null;
- try {
- cmdLine = parseOptions(args);
- } catch (IllegalStateException e) {
- printHelpAndExit(e.getMessage(), getOptions());
+ public JobFactory(Connection connection, Configuration
configuration, Path outputPath) {
+ this.connection = connection;
+ this.configuration = configuration;
+ this.outputPath = outputPath;
+
+ }
+
+ public Job getJob(String schemaName, String indexTable, String
dataTable, boolean useDirectApi) throws Exception {
+ if (indexTable == null) {
+ return configureJobForPartialBuild(schemaName, dataTable);
+ } else {
+ return configureJobForAysncIndex(schemaName, indexTable,
dataTable, useDirectApi);
}
- final Configuration configuration =
HBaseConfiguration.addHbaseResources(getConf());
- final String schemaName =
cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
- final String dataTable =
cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
- final String indexTable =
cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ }
+
+ private Job configureJobForPartialBuild(String schemaName, String
dataTable) throws Exception {
final String qDataTable =
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- final String qIndexTable =
SchemaUtil.getQualifiedTableName(schemaName, indexTable);
-
+ final PTable pdataTable = PhoenixRuntime.getTable(connection,
qDataTable);
connection = ConnectionUtil.getInputConnection(configuration);
- if (!isValidIndexTable(connection, qDataTable, indexTable)) {
- throw new IllegalArgumentException(String.format(
- " %s is not an index table for %s ", qIndexTable,
qDataTable));
+ long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
+ PTable indexWithMinDisableTimestamp = null;
+
+ //Get Indexes in building state, minDisabledTimestamp
+ List<String> disableIndexes = new ArrayList<String>();
+ List<PTable> disabledPIndexes = new ArrayList<PTable>();
+ for (PTable index : pdataTable.getIndexes()) {
+ if (index.getIndexState().equals(PIndexState.BUILDING)) {
+ disableIndexes.add(index.getTableName().getString());
+ disabledPIndexes.add(index);
+ if (minDisableTimestamp >
index.getIndexDisableTimestamp()) {
+ minDisableTimestamp =
index.getIndexDisableTimestamp();
+ indexWithMinDisableTimestamp = index;
+ }
+ }
+ }
+
+ if (indexWithMinDisableTimestamp == null) {
+ throw new Exception("There is no index for a datatable to
be rebuild:" + qDataTable);
}
+ if (minDisableTimestamp == 0) {
+ throw new Exception("It seems Index " +
indexWithMinDisableTimestamp
+ + " has disable timestamp as 0 , please run
IndexTool with IndexName to build it first");
+ // TODO probably we can initiate the job by ourself or can
skip them while making the list for partial build with a warning
+ }
+
+ long maxTimestamp = getMaxRebuildAsyncDate(schemaName,
disableIndexes);
+
+ //serialize index maintaienr in job conf with Base64 TODO:
Need to find better way to serialize them in conf.
+ List<IndexMaintainer> maintainers =
Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
+ for (PTable index : disabledPIndexes) {
+ maintainers.add(index.getIndexMaintainer(pdataTable,
connection.unwrap(PhoenixConnection.class)));
+ }
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(pdataTable,
indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
+ PhoenixConfigurationUtil.setIndexMaintainers(configuration,
indexMetaDataPtr);
+
+ //Prepare raw scan
+ Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
+ scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
+ scan.setRaw(true);
+ scan.setCacheBlocks(false);
+ if (pdataTable.isTransactional()) {
+ long maxTimeRange = pdataTable.getTimeStamp() + 1;
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
+
Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
+ }
+
+
+ String
physicalTableName=pdataTable.getPhysicalName().getString();
+ final String jobName = String.format("Phoenix Indexes build
for " + pdataTable.getName().toString());
+
+ PhoenixConfigurationUtil.setInputTableName(configuration,
qDataTable);
+ PhoenixConfigurationUtil.setPhysicalTableName(configuration,
physicalTableName);
+
+ //TODO: update disable indexes
+ PhoenixConfigurationUtil.setDisableIndexes(configuration,
StringUtils.join(",",disableIndexes));
+ final Job job = Job.getInstance(configuration, jobName);
+ job.setJarByClass(IndexTool.class);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ TableMapReduceUtil.initTableMapperJob(physicalTableName, scan,
PhoenixIndexPartialBuildMapper.class, null,
+ null, job);
+ TableMapReduceUtil.initCredentials(job);
+ TableInputFormat.configureSplitTable(job,
TableName.valueOf(physicalTableName));
+ return configureSubmittableJobUsingDirectApi(job, true);
+ }
+
+ private long getMaxRebuildAsyncDate(String schemaName,
List<String> disableIndexes) throws SQLException {
+ Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP;
+ Long maxDisabledTimeStamp=0L;
+ if (disableIndexes == null || disableIndexes.isEmpty()) {
return 0; }
+ List<String> quotedIndexes = new
ArrayList<String>(disableIndexes.size());
+ for (String index : disableIndexes) {
+ quotedIndexes.add("'" + index + "'");
+ }
+ ResultSet rs = connection.createStatement()
+ .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP
+ "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
+ + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE "
+ TABLE_SCHEM
+ + (schemaName != null && schemaName.length() >
0 ? "='" + schemaName + "'" : " IS NULL")
+ + " and " + TABLE_NAME + " IN (" +
StringUtils.join(",", quotedIndexes) + ")");
+ if (rs.next()) {
+ maxRebuilAsyncDate = rs.getLong(1);
+ maxDisabledTimeStamp = rs.getLong(2);
+ }
+ // Do check if table is disabled again after user invoked
async rebuilding during the run of the job
--- End diff --
Make sure this should not happen if that's the case we may lose some data.
> Extend IndexTool to allow incremental index rebuilds
> ----------------------------------------------------
>
> Key: PHOENIX-2890
> URL: https://issues.apache.org/jira/browse/PHOENIX-2890
> Project: Phoenix
> Issue Type: Improvement
> Reporter: Ankit Singhal
> Assignee: Ankit Singhal
> Priority: Minor
> Fix For: 4.9.0
>
> Attachments: PHOENIX-2890.patch, PHOENIX-2890_wip.patch
>
>
> Currently , IndexTool is used for initial index rebuild but I think we should
> extend it to be used for recovering index from last disabled timestamp too.
> In general terms if we run IndexTool on already existing/new index, then it
> should follow the same semantics as followed by background Index rebuilding
> thread.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)