gjacoby126 commented on a change in pull request #457: PHOENIX-5190 Implement
TaskRegionObserver for Index rebuild
URL: https://github.com/apache/phoenix/pull/457#discussion_r265719717
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
##########
@@ -298,4 +493,132 @@ public void run() {
}
}
}
+
+ /**
+ * Task runs periodically to rebuild indexes for System.Task entries.
+ *
+ */
+ public static class IndexRebuildTask extends SelfHealingTask {
+
+ public IndexRebuildTask(RegionCoprocessorEnvironment env, long
timeMaxInterval) {
+ super.init(env, timeMaxInterval);
+ }
+
+ public static String[] getArgValues(boolean directApi, boolean
useSnapshot, String schemaName,
+ String dataTable, String indxTable, String tenantId) {
+ final List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("-s");
+ args.add(schemaName);
+ }
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-it");
+ args.add(indxTable);
+ if (directApi) {
+ args.add("-direct");
+ // Need to run this job in foreground for the test to be
deterministic
+ args.add("-runfg");
+ }
+
+ if (useSnapshot) {
+ args.add("-snap");
+ }
+
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+ return args.toArray(new String[0]);
+ }
+
+ @Override
+ public void run() {
+ PhoenixConnection connForTask = null;
+ String tenantId = null;
+ String schemaName= null;
+ String tableName = null;
+ String indexName = null;
+ try {
+ connForTask =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+ // We have to clone the configuration because
env.getConfiguration is readonly.
+ Configuration conf =
HBaseConfiguration.create(env.getConfiguration());
+ conf.set(QueryServices.TRANSACTIONS_ENABLED,
Boolean.TRUE.toString());
+ IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(conf);
+ String[] excludeStatues = new String[] {
PTable.TaskStatus.FAILED.toString(),
+ PTable.TaskStatus.COMPLETE.toString() };
+ ResultSet rs = queryTaskTable(connForTask,
TaskType.INDEX_REBUILD, excludeStatues);
+ while (rs.next()) {
+ try {
+ TaskRecord taskRecord = parseResult(rs);
+ tenantId = taskRecord.getTenantId();
+ schemaName= taskRecord.getSchemaName();
+ tableName = taskRecord.getTableName();
+ indexName = taskRecord.getIndexName();
+ final String[] cmdArgs =
+ getArgValues(true, false, schemaName,
tableName, indexName, tenantId);
+
+ // Change task status to STARTED
+ addTask(connForTask, TaskType.INDEX_REBUILD, tenantId,
schemaName,
+ tableName, indexName,
PTable.TaskStatus.STARTED.toString(), taskRecord.data, null, null,true);
+
+ String data = taskRecord.data;
+ if (Strings.isNullOrEmpty(taskRecord.data)) {
+ data = "{}";
+ }
+ JsonParser jsonParser = new JsonParser();
+ JsonObject jsonObject =
jsonParser.parse(data).getAsJsonObject();
+ if (jsonObject.has("DisableBefore")) {
+ String disableBefore =
jsonObject.get("DisableBefore").toString();
Review comment:
This seems like logic that should be in the IndexTool itself.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services