This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 9aa8f3a Implement TaskRegionObserver for Index rebuild
9aa8f3a is described below
commit 9aa8f3a7bee08a6aeea614bbadd1dd32006eaedc
Author: Gokcen Iskender <[email protected]>
AuthorDate: Wed Mar 6 09:58:21 2019 -0800
Implement TaskRegionObserver for Index rebuild
Signed-off-by: Geoffrey Jacoby <[email protected]>
---
.../phoenix/end2end/DropTableWithViewsIT.java | 37 ++-
.../apache/phoenix/end2end/IndexRebuildTaskIT.java | 174 ++++++++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 3 +-
.../phoenix/coprocessor/TaskRegionObserver.java | 299 ++++++++---------
.../apache/phoenix/coprocessor/tasks/BaseTask.java | 17 +
.../coprocessor/tasks/DropChildViewsTask.java | 81 +++++
.../coprocessor/tasks/IndexRebuildTask.java | 151 +++++++++
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 5 +
.../apache/phoenix/mapreduce/index/IndexTool.java | 55 ++-
.../index/PhoenixIndexImportDirectReducer.java | 43 +++
.../phoenix/query/ConnectionQueryServicesImpl.java | 30 +-
.../org/apache/phoenix/query/QueryConstants.java | 6 +
.../java/org/apache/phoenix/schema/PTable.java | 26 +-
.../java/org/apache/phoenix/schema/task/Task.java | 369 +++++++++++++++++++++
14 files changed, 1115 insertions(+), 181 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
index 6aaf703..bf2633a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
@@ -18,12 +18,16 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
@@ -103,6 +107,9 @@ public class DropTableWithViewsIT extends
SplitSystemCatalogIT {
try (Connection conn = DriverManager.getConnection(getUrl());
Connection viewConn =
isMultiTenant ?
DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) {
+ // Empty the task table first.
+ conn.createStatement().execute("DELETE " + " FROM " +
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+
String ddlFormat =
"CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2
VARCHAR "
@@ -126,16 +133,14 @@ public class DropTableWithViewsIT extends
SplitSystemCatalogIT {
// Run DropChildViewsTask to complete the tasks for dropping child
views. The depth of the view tree is 2,
// so we expect that this will be done in two task handling runs
as each non-root level will be processed
// in one run
- TaskRegionObserver.DropChildViewsTask task =
- new TaskRegionObserver.DropChildViewsTask(
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
task.run();
task.run();
- ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
- " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
- PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue());
- assertFalse(rs.next());
+
+ assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(),
PTable.TaskType.DROP_CHILD_VIEWS, null);
+
// Views should be dropped by now
TableName linkTable =
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
TableViewFinderResult childViewsResult = new
TableViewFinderResult();
@@ -147,9 +152,25 @@ public class DropTableWithViewsIT extends
SplitSystemCatalogIT {
childViewsResult);
assertTrue(childViewsResult.getLinks().size() == 0);
// There should not be any orphan views
- rs = conn.createStatement().executeQuery("SELECT * FROM " +
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
" + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
" WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" +
SCHEMA2 +"'");
assertFalse(rs.next());
}
}
+
+ public static void assertTaskColumns(Connection conn, String
expectedStatus, PTable.TaskType taskType, String expectedData)
+ throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+ taskType.getSerializedValue());
+ assertTrue(rs.next());
+ String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
+ assertEquals(expectedStatus, taskStatus);
+
+ if (expectedData != null) {
+ String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
+ assertEquals(expectedData, data);
+ }
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
new file mode 100644
index 0000000..3c53808
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
+ protected static String TENANT1 = "tenant1";
+ private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Collections.emptyMap();
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+ TaskRegionEnvironment =(RegionCoprocessorEnvironment) getUtility()
+ .getRSForFirstRegionInTable(
+ PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+ .get(0).getCoprocessorHost()
+
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+ }
+
+ private String generateDDL(String format) {
+ StringBuilder optionsBuilder = new StringBuilder();
+
+ if (optionsBuilder.length() != 0) optionsBuilder.append(",");
+ optionsBuilder.append("MULTI_TENANT=true");
+
+ return String.format(format, "TENANT_ID VARCHAR NOT NULL, ",
"TENANT_ID, ", optionsBuilder.toString());
+ }
+
+ @Test
+ public void testIndexRebuildTask() throws Throwable {
+ String baseTable = generateUniqueName();
+ Connection conn = null;
+ Connection viewConn = null;
+ try {
+ conn = DriverManager.getConnection(getUrl());
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1);
+
+ viewConn =DriverManager.getConnection(getUrl(), props);
+ String ddlFormat =
+ "CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2
VARCHAR "
+ + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " )
%s";
+ conn.createStatement().execute(generateDDL(ddlFormat));
+ conn.commit();
+ // Create a view
+ String viewName = generateUniqueName();
+ String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM "
+ baseTable;
+ viewConn.createStatement().execute(viewDDL);
+
+ // Create index
+ String indexName = generateUniqueName();
+ String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)",
indexName, viewName);
+
+ viewConn.createStatement().execute(idxSDDL);
+
+ // Insert rows
+ int numOfValues = 1;
+ for (int i=0; i < numOfValues; i++){
+ viewConn.createStatement().execute(
+ String.format("UPSERT INTO %s VALUES('%s', '%s',
'%s')", viewName, String.valueOf(i), "y",
+ "z"));
+ }
+ viewConn.commit();
+
+ String data = "{IndexName:" + indexName + "}";
+ // Run IndexRebuildTask
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+
+ Timestamp startTs = new
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ // Add a task to System.Task to build indexes
+ Task.addTask(conn.unwrap(PhoenixConnection.class),
PTable.TaskType.INDEX_REBUILD,
+ TENANT1, null, viewName,
+ PTable.TaskStatus.CREATED.toString(), data, null, startTs,
null, true);
+
+
+ task.run();
+
+ String viewIndexTableName =
MetaDataUtil.getViewIndexPhysicalName(baseTable);
+ ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ int count =
getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName)));
+ assertTrue(count == numOfValues);
+
+
+ // Remove index contents and try again
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(viewIndexTableName);
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+
+ data = "{IndexName:" + indexName + ", DisableBefore:true}";
+
+ // Add a new task (update status to created) to System.Task to
rebuild indexes
+ Task.addTask(conn.unwrap(PhoenixConnection.class),
PTable.TaskType.INDEX_REBUILD,
+ TENANT1, null, viewName,
+ PTable.TaskStatus.CREATED.toString(), data, null, startTs,
null, true);
+ task.run();
+
+ Thread.sleep(15000);
+
+ Table systemHTable=
queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
+ count = getUtility().countRows(systemHTable);
+ assertEquals(1, count);
+
+ // Check task status and other column values.
+ DropTableWithViewsIT.assertTaskColumns(conn,
PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.INDEX_REBUILD,
+ null);
+
+ // See that index is rebuilt and confirm index has rows
+ Table htable=
queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+ count = getUtility().countRows(htable);
+ assertEquals(numOfValues, count);
+ } finally {
+ conn.createStatement().execute("DELETE " + " FROM " +
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ conn.commit();
+ if (conn != null) {
+ conn.close();
+ }
+ if (viewConn != null) {
+ viewConn.close();
+ }
+ }
+ }
+}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7d1ae2b..141f637 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -225,6 +225,7 @@ import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
@@ -2819,7 +2820,7 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
}
try {
PhoenixConnection conn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
- TaskRegionObserver.addTask(conn,
PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
+ Task.addTask(conn,
PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
Bytes.toString(schemaName),
Bytes.toString(tableName), this.accessCheckEnabled);
} catch (Throwable t) {
logger.error("Adding a task to drop child views
failed!", t);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 87f7414..7d00f46 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -18,20 +18,24 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
+import java.lang.reflect.Method;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.Properties;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,21 +43,17 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
-import org.apache.hadoop.hbase.ipc.RpcUtil;
-import org.apache.hadoop.hbase.security.User;
+
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.TaskType;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
@@ -64,11 +64,50 @@ import org.apache.phoenix.util.QueryUtil;
public class TaskRegionObserver extends BaseRegionObserver {
public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class);
+
protected ScheduledThreadPoolExecutor executor = new
ScheduledThreadPoolExecutor(TaskType.values().length);
private long timeInterval =
QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS;
private long timeMaxInterval =
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS;
@GuardedBy("TaskRegionObserver.class")
- private long initialDelay;
+ private long initialDelay =
QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS;
+
+ private static Map<TaskType, String> classMap = ImmutableMap.<TaskType,
String>builder()
+ .put(TaskType.DROP_CHILD_VIEWS,
"org.apache.phoenix.coprocessor.tasks.DropChildViewsTask")
+ .put(TaskType.INDEX_REBUILD,
"org.apache.phoenix.coprocessor.tasks.IndexRebuildTask")
+ .build();
+
+ public enum TaskResultCode {
+ SUCCESS,
+ FAIL,
+ SKIPPED,
+ }
+
+ public static class TaskResult {
+ private TaskResultCode resultCode;
+ private String details;
+
+ public TaskResult(TaskResultCode resultCode, String details) {
+ this.resultCode = resultCode;
+ this.details = details;
+ }
+
+ public TaskResultCode getResultCode() {
+ return resultCode;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+
+ @Override
+ public String toString() {
+ String result = resultCode.name();
+ if (!Strings.isNullOrEmpty(details)) {
+ result = result + " - " + details;
+ }
+ return result;
+ }
+ }
@Override
public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -103,102 +142,16 @@ public class TaskRegionObserver extends
BaseRegionObserver {
deprecationLogger.setLevel(Level.WARN);
}
- DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(),
timeMaxInterval);
+ SelfHealingTask task = new SelfHealingTask(e.getEnvironment(),
timeMaxInterval);
executor.scheduleWithFixedDelay(task, initialDelay, timeInterval,
TimeUnit.MILLISECONDS);
}
- private static void mutateSystemTaskTable(final PhoenixConnection conn,
final PreparedStatement stmt, boolean accessCheckEnabled)
- throws IOException {
- // we need to mutate SYSTEM.TASK with HBase/login user if access is
enabled.
- if (accessCheckEnabled) {
- User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- final Call rpcContext = RpcUtil.getRpcContext();
- // setting RPC context as null so that user can be reset
- try {
- RpcUtil.setRpcContext(null);
- stmt.execute();
- conn.commit();
- } catch (SQLException e) {
- throw new IOException(e);
- } finally {
- // setting RPC context back to original context of the
RPC
- RpcUtil.setRpcContext(rpcContext);
- }
- return null;
- }
- });
- }
- else {
- try {
- stmt.execute();
- conn.commit();
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
- }
+ public static class SelfHealingTask extends TimerTask {
+ protected RegionCoprocessorEnvironment env;
+ protected long timeMaxInterval;
+ protected boolean accessCheckEnabled;
- public static void addTask(PhoenixConnection conn, TaskType taskType,
String tenantId, String schemaName,
- String tableName, boolean accessCheckEnabled)
- throws IOException {
- PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement("UPSERT INTO " +
- PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
- PhoenixDatabaseMetaData.TASK_TYPE + ", " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
- stmt.setByte(1, taskType.getSerializedValue());
- if (tenantId != null) {
- stmt.setString(2, tenantId);
- } else {
- stmt.setNull(2, Types.VARCHAR);
- }
- if (schemaName != null) {
- stmt.setString(3, schemaName);
- } else {
- stmt.setNull(3, Types.VARCHAR);
- }
- stmt.setString(4, tableName);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
- }
-
- public static void deleteTask(PhoenixConnection conn, TaskType taskType,
Timestamp ts, String tenantId,
- String schemaName, String tableName, boolean
accessCheckEnabled) throws IOException {
- PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement("DELETE FROM " +
- PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
- " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND
" +
- PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
- PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? "
IS NULL " : " = '" + tenantId + "'") + " AND " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null
? " IS NULL " : " = '" + schemaName + "'") + " AND " +
- PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
- stmt.setByte(1, taskType.getSerializedValue());
- stmt.setTimestamp(2, ts);
- stmt.setString(3, tableName);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
- }
-
- /**
- * Task runs periodically to clean up task of child views whose parent is
dropped
- *
- */
- public static class DropChildViewsTask extends TimerTask {
- private RegionCoprocessorEnvironment env;
- private long timeMaxInterval;
- private boolean accessCheckEnabled;
-
- public DropChildViewsTask(RegionCoprocessorEnvironment env, long
timeMaxInterval) {
+ public SelfHealingTask(RegionCoprocessorEnvironment env, long
timeMaxInterval) {
this.env = env;
this.accessCheckEnabled =
env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
@@ -208,88 +161,106 @@ public class TaskRegionObserver extends
BaseRegionObserver {
@Override
public void run() {
PhoenixConnection connForTask = null;
- Timestamp timestamp = null;
- String tenantId = null;
- byte[] tenantIdBytes;
- String schemaName= null;
- byte[] schemaNameBytes;
- String tableName = null;
- byte[] tableNameBytes;
- PhoenixConnection pconn;
try {
- String taskQuery = "SELECT " +
- PhoenixDatabaseMetaData.TASK_TS + ", " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.TABLE_NAME +
- " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
- " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " +
PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue();
-
connForTask =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
- PreparedStatement taskStatement =
connForTask.prepareStatement(taskQuery);
- ResultSet rs = taskStatement.executeQuery();
- while (rs.next()) {
+ String[] excludeStates = new String[] {
PTable.TaskStatus.FAILED.toString(),
+ PTable.TaskStatus.COMPLETED.toString() };
+ List<Task.TaskRecord> taskRecords =
Task.queryTaskTable(connForTask, excludeStates);
+ for (Task.TaskRecord taskRecord : taskRecords){
try {
- // delete child views only if the parent table is
deleted from the system catalog
- timestamp = rs.getTimestamp(1);
- tenantId = rs.getString(2);
- tenantIdBytes= rs.getBytes(2);
- schemaName= rs.getString(3);
- schemaNameBytes = rs.getBytes(3);
- tableName= rs.getString(4);
- tableNameBytes = rs.getBytes(4);
-
- if (tenantId != null) {
- Properties tenantProps = new Properties();
-
tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
- pconn =
QueryUtil.getConnectionOnServer(tenantProps,
env.getConfiguration()).unwrap(PhoenixConnection.class);
-
+ TaskType taskType = taskRecord.getTaskType();
+ if (!classMap.containsKey(taskType)) {
+ LOG.warn("Don't know how to execute task type: " +
taskType.name());
+ continue;
}
- else {
- pconn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+ String className = classMap.get(taskType);
+
+ Class<?> concreteClass = Class.forName(className);
+
+ Object obj = concreteClass.newInstance();
+ Method runMethod =
concreteClass.getDeclaredMethod("run",
+ Task.TaskRecord.class);
+ Method checkCurretResult =
concreteClass.getDeclaredMethod("checkCurrentResult", Task.TaskRecord.class);
+ Method initMethod =
concreteClass.getSuperclass().getDeclaredMethod("init",
+ RegionCoprocessorEnvironment.class,
Long.class);
+ initMethod.invoke(obj, env, timeMaxInterval);
+
+ // if current status is already Started, check if we
need to re-run.
+ // Task can be async and already Started before.
+ TaskResult result = null;
+ if (taskRecord.getStatus() != null &&
taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) {
+ result = (TaskResult)
checkCurretResult.invoke(obj, taskRecord);
}
- MetaDataProtocol.MetaDataMutationResult result = new
MetaDataClient(pconn).updateCache(pconn.getTenantId(),
- schemaName, tableName, true);
- if (result.getMutationCode() !=
MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
- MetaDataEndpointImpl.dropChildViews(env,
tenantIdBytes, schemaNameBytes, tableNameBytes);
- } else if (System.currentTimeMillis() <
timeMaxInterval + timestamp.getTime()) {
- // skip this task as it has not been expired and
its parent table has not been dropped yet
- LOG.info("Skipping a child view drop task. The
parent table has not been dropped yet : " +
- schemaName + "." + tableName +
- " with tenant id " + (tenantId == null ? "
IS NULL" : tenantId) +
- " and timestamp " + timestamp.toString());
- continue;
+ if (result == null) {
+ // reread task record. There might be async
setting of task status
+ taskRecord = Task.queryTaskTable(connForTask,
taskRecord.getSchemaName(), taskRecord.getTableName(),
+ taskType, taskRecord.getTenantId(),
null).get(0);
+ if (taskRecord.getStatus() != null &&
taskRecord.getStatus().equals(
+ PTable.TaskStatus.COMPLETED.toString())) {
+ continue;
+ }
+ // Change task status to STARTED
+ Task.addTask(connForTask,
taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
+ taskRecord.getTableName(),
PTable.TaskStatus.STARTED.toString(),
+ taskRecord.getData(),
taskRecord.getPriority(), taskRecord.getTimeStamp(), null, true);
+
+ // invokes the method at runtime
+ result = (TaskResult) runMethod.invoke(obj,
taskRecord);
}
- else {
- LOG.warn(" A drop child view task has expired and
will be removed from the system task table : " +
- schemaName + "." + tableName +
- " with tenant id " + (tenantId == null ? "
IS NULL" : tenantId) +
- " and timestamp " + timestamp.toString());
+
+ if (result != null) {
+ String taskStatus =
PTable.TaskStatus.FAILED.toString();
+ if (result.getResultCode() ==
TaskResultCode.SUCCESS) {
+ taskStatus =
PTable.TaskStatus.COMPLETED.toString();
+ } else if (result.getResultCode() ==
TaskResultCode.SKIPPED) {
+ // We will pickup this task again
+ continue;
+ }
+
+ setEndTaskStatus(connForTask, taskRecord,
taskStatus);
}
- deleteTask(connForTask,
PTable.TaskType.DROP_CHILD_VIEWS, timestamp, tenantId, schemaName,
- tableName, this.accessCheckEnabled);
}
catch (Throwable t) {
- LOG.warn("Exception while dropping a child view task.
" +
+ LOG.warn("Exception while running self healingtask. " +
"It will be retried in the next system task
table scan : " +
- schemaName + "." + tableName +
- " with tenant id " + (tenantId == null ? " IS
NULL" : tenantId) +
- " and timestamp " + timestamp.toString(), t);
+ " taskType : " +
taskRecord.getTaskType().name() +
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (taskRecord.getTenantId()
== null ? " IS NULL" : taskRecord.getTenantId()) +
+ " and timestamp " +
taskRecord.getTimeStamp().toString(), t);
}
}
} catch (Throwable t) {
- LOG.error("DropChildViewsTask failed!", t);
+ LOG.error("SelfHealingTask failed!", t);
} finally {
if (connForTask != null) {
try {
connForTask.close();
} catch (SQLException ignored) {
- LOG.debug("DropChildViewsTask can't close connection",
ignored);
+ LOG.debug("SelfHealingTask can't close connection",
ignored);
}
}
}
}
+
+ public static void setEndTaskStatus(PhoenixConnection connForTask,
Task.TaskRecord taskRecord, String taskStatus)
+ throws IOException {
+ // update data with details.
+ String data = taskRecord.getData();
+ if (Strings.isNullOrEmpty(data)) {
+ data = "{}";
+ }
+ JsonParser jsonParser = new JsonParser();
+ JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+ jsonObject.addProperty("TaskDetails", taskStatus);
+ data = jsonObject.toString();
+
+ Timestamp endTs = new
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ Task.addTask(connForTask, taskRecord.getTaskType(),
taskRecord.getTenantId(), taskRecord.getSchemaName(),
+ taskRecord.getTableName(), taskStatus, data,
taskRecord.getPriority(),
+ taskRecord.getTimeStamp(), endTs, true);
+ }
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java
new file mode 100644
index 0000000..5c9a5c4
--- /dev/null
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java
@@ -0,0 +1,17 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.schema.task.Task;
+
+public abstract class BaseTask {
+ protected long timeMaxInterval;
+ protected RegionCoprocessorEnvironment env;
+ public void init(RegionCoprocessorEnvironment env, Long interval) {
+ this.env = env;
+ this.timeMaxInterval = interval;
+ }
+ public abstract TaskRegionObserver.TaskResult run(Task.TaskRecord
taskRecord);
+
+ public abstract TaskRegionObserver.TaskResult
checkCurrentResult(Task.TaskRecord taskRecord) throws Exception;
+}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
new file mode 100644
index 0000000..f00e1f6
--- /dev/null
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java
@@ -0,0 +1,81 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Properties;
+
+/**
+ * Task runs periodically to clean up task of child views whose parent is
dropped
+ *
+ */
+public class DropChildViewsTask extends BaseTask {
+ public static final Log LOG = LogFactory.getLog(DropChildViewsTask.class);
+
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ PhoenixConnection pconn = null;
+ Timestamp timestamp = taskRecord.getTimeStamp();
+ try {
+ String tenantId = taskRecord.getTenantId();
+ if (tenantId != null) {
+ Properties tenantProps = new Properties();
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantId);
+ pconn = QueryUtil.getConnectionOnServer(tenantProps,
env.getConfiguration()).unwrap(PhoenixConnection.class);
+ }
+ else {
+ pconn =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ }
+
+ MetaDataProtocol.MetaDataMutationResult result = new
MetaDataClient(pconn).updateCache(pconn.getTenantId(),
+ taskRecord.getSchemaName(), taskRecord.getTableName(),
true);
+ if (result.getMutationCode() !=
MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
+ MetaDataEndpointImpl
+ .dropChildViews(env, taskRecord.getTenantIdBytes(),
taskRecord.getSchemaNameBytes(), taskRecord.getTableNameBytes());
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } else if (System.currentTimeMillis() < timeMaxInterval +
timestamp.getTime()) {
+ // skip this task as it has not been expired and its parent
table has not been dropped yet
+ LOG.info("Skipping a child view drop task. The parent table
has not been dropped yet : " +
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (tenantId == null ? " IS NULL" :
tenantId) +
+ " and timestamp " + timestamp.toString());
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "");
+ }
+ else {
+ LOG.warn(" A drop child view task has expired and will be
marked as failed : " +
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (tenantId == null ? " IS NULL" :
tenantId) +
+ " and timestamp " + timestamp.toString());
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"Expired");
+ }
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while dropping a child view task. " +
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (taskRecord.getTenantId() == null ? "
IS NULL" : taskRecord.getTenantId()) +
+ " and timestamp " + timestamp.toString(), t);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
t.toString());
+ } finally {
+ if (pconn != null) {
+ try {
+ pconn.close();
+ } catch (SQLException ignored) {
+ LOG.debug("DropChildViewsTask can't close connection",
ignored);
+ }
+ }
+ }
+ }
+
+ public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord
taskRecord) throws Exception {
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
new file mode 100644
index 0000000..c2bdf51
--- /dev/null
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -0,0 +1,151 @@
+package org.apache.phoenix.coprocessor.tasks;
+
+import com.google.common.base.Strings;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Task runs periodically to rebuild indexes for System.Task entries.
+ *
+ */
+public class IndexRebuildTask extends BaseTask {
+ public static final String IndexName = "IndexName";
+ public static final String JobID = "JobID";
+ public static final Log LOG = LogFactory.getLog(IndexRebuildTask.class);
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ Connection conn = null;
+
+ try {
+ // We have to clone the configuration because env.getConfiguration
is readonly.
+ Configuration conf =
HBaseConfiguration.create(env.getConfiguration());
+ conn = QueryUtil.getConnectionOnServer(env.getConfiguration());
+
+ conf.set(QueryServices.TRANSACTIONS_ENABLED,
Boolean.TRUE.toString());
+
+ String data = taskRecord.getData();
+ if (Strings.isNullOrEmpty(taskRecord.getData())) {
+ data = "{}";
+ }
+ JsonParser jsonParser = new JsonParser();
+ JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+ String indexName = getIndexName(jsonObject);
+ if (Strings.isNullOrEmpty(indexName)) {
+ String str = "Index name is not found. Index rebuild cannot
continue " +
+ "Data : " + data;
+ LOG.warn(str);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+ }
+
+ boolean shouldDisable = false;
+ if (jsonObject.has("DisableBefore")) {
+ String disableBefore =
jsonObject.get("DisableBefore").toString();
+ if (!Strings.isNullOrEmpty(disableBefore)) {
+ shouldDisable = Boolean.valueOf(disableBefore);
+ }
+ }
+
+ // Run index tool async.
+ boolean runForeground = false;
+ Map.Entry<Integer, Job> indexToolRes = IndexTool
+ .run(conf, taskRecord.getSchemaName(),
taskRecord.getTableName(), indexName, true,
+ false, taskRecord.getTenantId(), shouldDisable,
runForeground);
+ int status = indexToolRes.getKey();
+ if (status != 0) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "Index
tool returned : " + status);
+ }
+
+ Job job = indexToolRes.getValue();
+
+ jsonObject.addProperty(JobID, job.getJobID().toString());
+ Task.addTask(conn.unwrap(PhoenixConnection.class ),
taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
+ taskRecord.getTableName(),
PTable.TaskStatus.STARTED.toString(), jsonObject.toString(),
taskRecord.getPriority(),
+ taskRecord.getTimeStamp(), null, true);
+ // It will take some time to finish, so we will check the status
in a separate task.
+ return null;
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while running index rebuild task. " +
+ "It will be retried in the next system task table scan : "
+
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (taskRecord.getTenantId() == null ? "
IS NULL" : taskRecord.getTenantId()) +
+ " and data " + taskRecord.getData(), t);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
t.toString());
+ } finally {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.debug("IndexRebuildTask can't close connection");
+ }
+ }
+ }
+
+ }
+
+ private String getIndexName(JsonObject jsonObject) {
+ String indexName = null;
+ // Get index name from data column.
+ if (jsonObject.has(IndexName)) {
+ indexName = jsonObject.get(IndexName).toString().replaceAll("\"",
"");
+ }
+ return indexName;
+ }
+
+ private String getJobID(String data) {
+ if (Strings.isNullOrEmpty(data)) {
+ data = "{}";
+ }
+ JsonParser jsonParser = new JsonParser();
+ JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
+ String jobId = null;
+ if (jsonObject.has(JobID)) {
+ jobId = jsonObject.get(JobID).toString().replaceAll("\"", "");
+ }
+ return jobId;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord
taskRecord)
+ throws Exception {
+
+ String jobID = getJobID(taskRecord.getData());
+ if (jobID != null) {
+ Configuration conf =
HBaseConfiguration.create(env.getConfiguration());
+ Configuration configuration =
HBaseConfiguration.addHbaseResources(conf);
+ Cluster cluster = new Cluster(configuration);
+
+ Job job =
cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID));
+
+ if (job != null && job.isComplete()) {
+ if (job.isSuccessful()) {
+ LOG.warn("IndexRebuildTask checkCurrentResult job is
successful " + taskRecord.getTableName());
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } else {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+ "Index is DISABLED");
+ }
+ }
+
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index f747b90..208cf46 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -217,6 +217,11 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData {
public static final byte[] TASK_TYPE_BYTES = Bytes.toBytes(TASK_TYPE);
public static final String TASK_TS = "TASK_TS";
public static final byte[] TASK_TS_BYTES = Bytes.toBytes(TASK_TS);
+ public static final String TASK_STATUS = "TASK_STATUS";
+ public static final String TASK_END_TS = "TASK_END_TS";
+ public static final String TASK_PRIORITY = "TASK_PRIORITY";
+ public static final String TASK_DATA = "TASK_DATA";
+ public static final String TASK_TABLE_TTL = "864000";
public static final String ARRAY_SIZE = "ARRAY_SIZE";
public static final byte[] ARRAY_SIZE_BYTES = Bytes.toBytes(ARRAY_SIZE);
public static final String VIEW_CONSTANT = "VIEW_CONSTANT";
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 7791472..051582e 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -28,10 +28,12 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
@@ -491,14 +493,17 @@ public class IndexTool extends Configured implements Tool
{
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
}
- fs = outputPath.getFileSystem(configuration);
- fs.delete(outputPath, true);
-
+ if (outputPath != null) {
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
+ }
final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE,
schemaName, dataTable, indexTable);
final Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(IndexTool.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- FileOutputFormat.setOutputPath(job, outputPath);
+ if (outputPath != null) {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ }
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class,
PhoenixServerBuildIndexInputFormat.class,
qDataTable, "");
@@ -845,6 +850,48 @@ public class IndexTool extends Configured implements Tool {
return false;
}
+ public static Map.Entry<Integer, Job> run(Configuration conf, String
schemaName, String dataTable, String indexTable,
+ boolean directApi, boolean useSnapshot, String tenantId, boolean
disableBefore, boolean runForeground) throws Exception {
+ final List<String> args = Lists.newArrayList();
+ if (schemaName != null) {
+ args.add("-s");
+ args.add(schemaName);
+ }
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-it");
+ args.add(indexTable);
+ if (directApi) {
+ args.add("-direct");
+ }
+
+ if (runForeground) {
+ args.add("-runfg");
+ }
+
+ if (useSnapshot) {
+ args.add("-snap");
+ }
+
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+
+ args.add("-op");
+ args.add("/tmp/" + UUID.randomUUID().toString());
+
+ if (disableBefore) {
+ PhoenixConfigurationUtil.setDisableIndexes(conf, indexTable);
+ }
+
+ IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(conf);
+ int status = indexingTool.run(args.toArray(new String[0]));
+ Job job = indexingTool.getJob();
+ return new AbstractMap.SimpleEntry<Integer, Job>(status, job);
+ }
+
public static void main(final String[] args) throws Exception {
int result = ToolRunner.run(new IndexTool(), args);
System.exit(result);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 0786b9b..57688fd 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -18,17 +18,31 @@
package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
+
/**
* Reducer class that does only one task and that is to update the index state
of the table.
*/
@@ -41,9 +55,38 @@ public class PhoenixIndexImportDirectReducer extends
protected void cleanup(Context context) throws IOException,
InterruptedException{
try {
IndexToolUtil.updateIndexState(context.getConfiguration(),
PIndexState.ACTIVE);
+
+ updateTasksTable(context);
} catch (SQLException e) {
LOG.error(" Failed to update the status to Active");
throw new RuntimeException(e.getMessage());
}
}
+
+ private void updateTasksTable(Context context) throws SQLException,
IOException {
+ final Properties overrideProps = new Properties();
+ final Connection
+ connection = ConnectionUtil
+ .getOutputConnection(context.getConfiguration(),
overrideProps);
+ try {
+ String fullTableName =
PhoenixConfigurationUtil.getInputTableName(context.getConfiguration());
+ String tenantId =
context.getConfiguration().get(MAPREDUCE_TENANT_ID, null);
+ String schemaName =
SchemaUtil.getSchemaNameFromFullName(fullTableName);
+ String tableName =
SchemaUtil.getTableNameFromFullName(fullTableName);
+ String indexName =
PhoenixConfigurationUtil.getDisableIndexes(context.getConfiguration());
+ List<Task.TaskRecord> taskRecords =
Task.queryTaskTable(connection, schemaName, tableName,
+ PTable.TaskType.INDEX_REBUILD, tenantId, indexName);
+ if (taskRecords != null && taskRecords.size() > 0) {
+ for (Task.TaskRecord taskRecord : taskRecords) {
+ TaskRegionObserver.SelfHealingTask.setEndTaskStatus(
+ connection.unwrap(PhoenixConnection.class),
taskRecords.get(0),
+ PTable.TaskStatus.COMPLETED.toString());
+ }
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index ea33bd8..394b6ea 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -46,8 +46,10 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
@@ -236,6 +238,7 @@ import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
@@ -3497,6 +3500,30 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
clearCache();
}
}
+
+ try {
+ metaConnection.createStatement().executeUpdate(getTaskDDL());
+ } catch (NewerTableAlreadyExistsException e) {
+
+ } catch (TableAlreadyExistsException e) {
+ long currentServerSideTableTimeStamp =
e.getTable().getTimeStamp();
+ if (currentServerSideTableTimeStamp <=
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
+ String
+ columnsToAdd =
+ PhoenixDatabaseMetaData.TASK_STATUS + " " +
PVarchar.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.TASK_END_TS + "
" + PTimestamp.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.TASK_PRIORITY +
" " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", "
+ + PhoenixDatabaseMetaData.TASK_DATA + " "
+ PVarchar.INSTANCE.getSqlTypeName();
+ String taskTableFullName =
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE);
+ metaConnection =
+ addColumnsIfNotExists(metaConnection,
taskTableFullName,
+
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd);
+ metaConnection.createStatement().executeUpdate(
+ "ALTER TABLE " + taskTableFullName + " SET " + TTL
+ "=" + TASK_TABLE_TTL);
+ clearCache();
+ }
+ }
+
try {
metaConnection.createStatement().executeUpdate(getFunctionTableDDL());
} catch (NewerTableAlreadyExistsException e) {} catch
(TableAlreadyExistsException e) {}
@@ -3509,9 +3536,6 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getMutexDDL());
} catch (NewerTableAlreadyExistsException e) {} catch
(TableAlreadyExistsException e) {}
- try {
- metaConnection.createStatement().executeUpdate(getTaskDDL());
- } catch (NewerTableAlreadyExistsException e) {} catch
(TableAlreadyExistsException e) {}
// In case namespace mapping is enabled and system table to system
namespace mapping is also enabled,
// create an entry for the SYSTEM namespace in the SYSCAT table,
so that GRANT/REVOKE commands can work
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index c45fec9..6f9e8c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -379,9 +379,15 @@ public interface QueryConstants {
TENANT_ID + " VARCHAR NULL," +
TABLE_SCHEM + " VARCHAR NULL," +
TABLE_NAME + " VARCHAR NOT NULL,\n" +
+ // Non-PK columns
+ TASK_STATUS + " VARCHAR NULL," +
+ TASK_END_TS + " TIMESTAMP NULL," +
+ TASK_PRIORITY + " UNSIGNED_TINYINT NULL," +
+ TASK_DATA + " VARCHAR NULL,\n" +
"CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
TASK_TYPE + "," + TASK_TS + " ROW_TIMESTAMP," + TENANT_ID + "," + TABLE_SCHEM +
"," +
TABLE_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
+ HColumnDescriptor.TTL + "=" + TASK_TABLE_TTL + ",\n" + // 10
days
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 3e22225..6c4d3a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -194,7 +194,8 @@ public interface PTable extends PMetaDataEntity {
}
public enum TaskType {
- DROP_CHILD_VIEWS((byte)1);
+ DROP_CHILD_VIEWS((byte)1),
+ INDEX_REBUILD((byte)2);
private final byte[] byteValue;
private final byte serializedValue;
@@ -222,6 +223,29 @@ public interface PTable extends PMetaDataEntity {
}
}
+ public enum TaskStatus {
+ CREATED {
+ public String toString() {
+ return "CREATED";
+ }
+ },
+ STARTED {
+ public String toString() {
+ return "STARTED";
+ }
+ },
+ COMPLETED {
+ public String toString() {
+ return "COMPLETED";
+ }
+ },
+ FAILED {
+ public String toString() {
+ return "FAILED";
+ }
+ },
+ }
+
public enum ImmutableStorageScheme implements
ColumnValueEncoderDecoderSupplier {
ONE_CELL_PER_COLUMN((byte)1) {
@Override
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
new file mode 100644
index 0000000..babc2f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -0,0 +1,369 @@
+package org.apache.phoenix.schema.task;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Task {
+ private static void mutateSystemTaskTable(final PhoenixConnection conn,
final PreparedStatement stmt, boolean accessCheckEnabled)
+ throws IOException {
+ // we need to mutate SYSTEM.TASK with HBase/login user if access is
enabled.
+ if (accessCheckEnabled) {
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ final RpcServer.Call rpcContext = RpcUtil.getRpcContext();
+ // setting RPC context as null so that user can be reset
+ try {
+ RpcUtil.setRpcContext(null);
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ // setting RPC context back to original context of the
RPC
+ RpcUtil.setRpcContext(rpcContext);
+ }
+ return null;
+ }
+ });
+ }
+ else {
+ try {
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static PreparedStatement setValuesToAddTaskPS(PreparedStatement
stmt, PTable.TaskType taskType,
+ String tenantId, String schemaName, String tableName) throws
SQLException {
+ stmt.setByte(1, taskType.getSerializedValue());
+ if (tenantId != null) {
+ stmt.setString(2, tenantId);
+ } else {
+ stmt.setNull(2, Types.VARCHAR);
+ }
+ if (schemaName != null) {
+ stmt.setString(3, schemaName);
+ } else {
+ stmt.setNull(3, Types.VARCHAR);
+ }
+ stmt.setString(4, tableName);
+ return stmt;
+ }
+
+ private static PreparedStatement setValuesToAddTaskPS(PreparedStatement
stmt, PTable.TaskType taskType,
+ String tenantId, String schemaName, String tableName, String
taskStatus, String data,
+ Integer priority, Timestamp startTs, Timestamp endTs) throws
SQLException {
+ stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName,
tableName);
+ if (taskStatus != null) {
+ stmt.setString(5, taskStatus);
+ } else {
+ stmt.setString(5, PTable.TaskStatus.CREATED.toString());
+ }
+ if (priority != null) {
+ stmt.setInt(6, priority);
+ } else {
+ byte defaultPri = 4;
+ stmt.setInt(6, defaultPri);
+ }
+ if (startTs == null) {
+ startTs = new
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ }
+ stmt.setTimestamp(7, startTs);
+ if (endTs != null) {
+ stmt.setTimestamp(8, endTs);
+ } else {
+ if (taskStatus != null &&
taskStatus.equals(PTable.TaskStatus.COMPLETED.toString())) {
+ endTs = new
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ stmt.setTimestamp(8, endTs);
+ } else {
+ stmt.setNull(8, Types.TIMESTAMP);
+ }
+ }
+ if (data != null) {
+ stmt.setString(9, data);
+ } else {
+ stmt.setNull(9, Types.VARCHAR);
+ }
+ return stmt;
+ }
+
+ public static void addTask(PhoenixConnection conn, PTable.TaskType
taskType, String tenantId, String schemaName,
+ String tableName, boolean accessCheckEnabled)
+ throws IOException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement("UPSERT INTO " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
+ PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
+ stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName,
tableName);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ }
+
+ public static void addTask(PhoenixConnection conn, PTable.TaskType
taskType, String tenantId, String schemaName,
+ String tableName, String taskStatus, String data, Integer
priority, Timestamp startTs, Timestamp endTs,
+ boolean accessCheckEnabled)
+ throws IOException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement("UPSERT INTO " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
+ PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+ PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+ PhoenixDatabaseMetaData.TASK_TS + ", " +
+ PhoenixDatabaseMetaData.TASK_END_TS + ", " +
+ PhoenixDatabaseMetaData.TASK_DATA +
+ " ) VALUES(?,?,?,?,?,?,?,?,?)");
+ stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName,
tableName, taskStatus, data, priority, startTs, endTs);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ }
+
+ public static void deleteTask(PhoenixConnection conn, PTable.TaskType
taskType, Timestamp ts, String tenantId,
+ String schemaName, String tableName, boolean accessCheckEnabled)
throws IOException {
+ PreparedStatement stmt = null;
+ try {
+ stmt = conn.prepareStatement("DELETE FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND
" +
+ PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
+ PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? "
IS NULL " : " = '" + tenantId + "'") + " AND " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null
? " IS NULL " : " = '" + schemaName + "'") + " AND " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
+ stmt.setByte(1, taskType.getSerializedValue());
+ stmt.setTimestamp(2, ts);
+ stmt.setString(3, tableName);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ }
+
+ private static List<TaskRecord> populateTasks(Connection connection,
String taskQuery)
+ throws SQLException {
+ PreparedStatement taskStatement =
connection.prepareStatement(taskQuery);
+ ResultSet rs = taskStatement.executeQuery();
+
+ List<TaskRecord> result = new ArrayList<>();
+ while (rs.next()) {
+ // delete child views only if the parent table is deleted from the
system catalog
+ TaskRecord taskRecord = parseResult(rs);
+ result.add(taskRecord);
+ }
+ return result;
+ }
+
+ public static List<TaskRecord> queryTaskTable(Connection connection,
String schema, String tableName,
+ PTable.TaskType taskType, String tenantId, String indexName)
+ throws SQLException {
+ String taskQuery = "SELECT " +
+ PhoenixDatabaseMetaData.TASK_TS + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+ PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+ PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+ PhoenixDatabaseMetaData.TASK_DATA +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+ taskQuery += " WHERE " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " ='" + tableName +
"' AND " +
+ PhoenixDatabaseMetaData.TASK_TYPE + "=" +
taskType.getSerializedValue();
+ if (!Strings.isNullOrEmpty(tenantId)) {
+ taskQuery += " AND " + PhoenixDatabaseMetaData.TENANT_ID +
"='" + tenantId + "' ";
+ }
+
+ if (!Strings.isNullOrEmpty(schema)) {
+ taskQuery += " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM +
"='" + schema + "' ";
+ }
+
+ if (!Strings.isNullOrEmpty(indexName)) {
+ taskQuery += " AND " + PhoenixDatabaseMetaData.TASK_DATA + "
LIKE '%" + indexName + "%'";
+ }
+
+ return populateTasks(connection, taskQuery);
+ }
+
+ public static List<TaskRecord> queryTaskTable(Connection connection,
String[] excludedTaskStatus)
+ throws SQLException {
+ String taskQuery = "SELECT " +
+ PhoenixDatabaseMetaData.TASK_TS + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME + ", " +
+ PhoenixDatabaseMetaData.TASK_STATUS + ", " +
+ PhoenixDatabaseMetaData.TASK_TYPE + ", " +
+ PhoenixDatabaseMetaData.TASK_PRIORITY + ", " +
+ PhoenixDatabaseMetaData.TASK_DATA +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+ if (excludedTaskStatus != null && excludedTaskStatus.length > 0) {
+ taskQuery += " WHERE " + PhoenixDatabaseMetaData.TASK_STATUS + "
IS NULL OR " +
+ PhoenixDatabaseMetaData.TASK_STATUS + " NOT IN (";
+ String[] values = new String[excludedTaskStatus.length];
+ for (int i=0; i < excludedTaskStatus.length; i++) {
+ values[i] = String.format("'%s'",
excludedTaskStatus[i].trim());
+ }
+
+ //Delimit with comma
+ taskQuery += String.join(",", values);
+ taskQuery += ")";
+ }
+
+ return populateTasks(connection, taskQuery);
+ }
+
+ public static TaskRecord parseResult(ResultSet rs) throws SQLException {
+ TaskRecord taskRecord = new TaskRecord();
+
taskRecord.setTimeStamp(rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS));
+
taskRecord.setTenantId(rs.getString(PhoenixDatabaseMetaData.TENANT_ID));
+
taskRecord.setTenantIdBytes(rs.getBytes(PhoenixDatabaseMetaData.TENANT_ID));
+
taskRecord.setSchemaName(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
+
taskRecord.setSchemaNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_SCHEM));
+
taskRecord.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
+
taskRecord.setTableNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_NAME));
+
taskRecord.setStatus(rs.getString(PhoenixDatabaseMetaData.TASK_STATUS));
+
taskRecord.setTaskType(PTable.TaskType.fromSerializedValue(rs.getByte(PhoenixDatabaseMetaData.TASK_TYPE
)));
+
taskRecord.setPriority(rs.getInt(PhoenixDatabaseMetaData.TASK_PRIORITY));
+ taskRecord.setData(rs.getString(PhoenixDatabaseMetaData.TASK_DATA));
+ return taskRecord;
+ }
+
+ public static class TaskRecord {
+ private String tenantId;
+ private Timestamp timeStamp;
+ private byte[] tenantIdBytes;
+ private String schemaName= null;
+ private byte[] schemaNameBytes;
+ private String tableName = null;
+ private byte[] tableNameBytes;
+
+ private PTable.TaskType taskType;
+ private String status;
+ private int priority;
+ private String data;
+
+ public String getTenantId() {
+ return tenantId;
+ }
+
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ public Timestamp getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(Timestamp timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public byte[] getTenantIdBytes() {
+ return tenantIdBytes;
+ }
+
+ public void setTenantIdBytes(byte[] tenantIdBytes) {
+ this.tenantIdBytes = tenantIdBytes;
+ }
+
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ public void setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ }
+
+ public byte[] getSchemaNameBytes() {
+ return schemaNameBytes;
+ }
+
+ public void setSchemaNameBytes(byte[] schemaNameBytes) {
+ this.schemaNameBytes = schemaNameBytes;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public byte[] getTableNameBytes() {
+ return tableNameBytes;
+ }
+
+ public void setTableNameBytes(byte[] tableNameBytes) {
+ this.tableNameBytes = tableNameBytes;
+ }
+
+ public String getData() {
+ if (data == null) {
+ return "";
+ }
+ return data;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public PTable.TaskType getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(PTable.TaskType taskType) {
+ this.taskType = taskType;
+ }
+
+ }
+}