ChinmaySKulkarni commented on a change in pull request #960:
URL: https://github.com/apache/phoenix/pull/960#discussion_r520956276
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
##########
@@ -431,6 +431,8 @@ public SQLException newException(SQLExceptionInfo info) {
PTable.LinkType.CHILD_TABLE + ") for view"),
TABLE_NOT_IN_REGION(1145, "XCL45", "No modifications allowed on this
table. "
+ "Table not in this region."),
+ UNABLE_TO_UPSERT_TASK(1146, "XCL46",
+ "Error upserting records in Task system table"),
Review comment:
nit: Change to "SYSTEM.TASK" table
##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
##########
@@ -75,6 +85,38 @@ public Void run() throws Exception {
}
}
+ private static List<Mutation> getMutationsForSystemTaskTable(
+ PhoenixConnection conn, PreparedStatement stmt,
+ boolean accessCheckEnabled) throws IOException {
+ // we need to mutate SYSTEM.TASK with HBase/login user if access is
enabled.
+ if (accessCheckEnabled) {
+ return User.runAsLoginUser(() -> {
+ final RpcCall rpcContext = RpcUtil.getRpcContext();
+ // setting RPC context as null so that user can be reset
+ try {
+ RpcUtil.setRpcContext(null);
+ stmt.execute();
+ // retrieve mutations for SYSTEM.TASK upsert query
+ return conn.getMutationState().toMutations().next()
Review comment:
If other mutations were made using the same connection, we will be
returning the joined state of all of them here right? Don't we want to restrict
to just returning mutations corresponding to the upsert into the SYSTEM.TASK
table?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
##########
@@ -1094,6 +1095,11 @@ private void addCoprocessors(byte[] tableName,
TableDescriptorBuilder builder,
if(!newDesc.hasCoprocessor(TaskRegionObserver.class.getName())) {
builder.addCoprocessor(TaskRegionObserver.class.getName(),
null, priority, null);
}
+ if (!newDesc.hasCoprocessor(
Review comment:
If SYSTEM.TASK already exists on a cluster and you upgrade to 4.16
server bits and connect with a 4.16 client, will we still install this new
coproc or does that need extra steps in the upgrade path?
##########
File path: phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
##########
@@ -141,12 +181,67 @@ public static void addTask(PhoenixConnection conn,
PTable.TaskType taskType, Str
PhoenixDatabaseMetaData.TASK_END_TS + ", " +
PhoenixDatabaseMetaData.TASK_DATA +
" ) VALUES(?,?,?,?,?,?,?,?,?)");
- stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName,
tableName, taskStatus, data, priority, startTs, endTs);
- LOGGER.info("Adding task " + taskType + "," +tableName + "," +
taskStatus + "," + startTs, ","+endTs);
+ stmt = setValuesToAddTaskPS(stmt, systemTaskParams.getTaskType(),
+ systemTaskParams.getTenantId(),
+ systemTaskParams.getSchemaName(),
+ systemTaskParams.getTableName(),
+ systemTaskParams.getTaskStatus(), systemTaskParams.getData(),
+ systemTaskParams.getPriority(), systemTaskParams.getStartTs(),
+ systemTaskParams.getEndTs());
+ LOGGER.info("Adding task type: {} , tableName: {} , taskStatus: {}"
+ + " , startTs: {} , endTs: {}", systemTaskParams.getTaskType(),
+ systemTaskParams.getTableName(),
+ systemTaskParams.getTaskStatus(),
systemTaskParams.getStartTs(),
+ systemTaskParams.getEndTs());
} catch (SQLException e) {
throw new IOException(e);
}
- mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
+ // if query is getting executed by client (useTaskEndpoint is false),
Review comment:
I'm not sure I understand this comment. We want to use the endpoint if
triggered from client-side right? Can you please explain the use of
`useTaskEndpoint`?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
##########
@@ -4668,11 +4669,32 @@ public MutationState alterIndex(AlterIndexStatement
statement) throws SQLExcepti
}};
try {
String json =
JacksonUtil.getObjectWriter().writeValueAsString(props);
- Task.addTask(connection,
PTable.TaskType.INDEX_REBUILD,
- tenantId, schemaName,
- dataTableName,
PTable.TaskStatus.CREATED.toString(),
- json, null, ts, null, true);
- connection.commit();
+ List<Mutation> sysTaskUpsertMutations =
Task.getMutationsForAddTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(
+ PTable.TaskType.INDEX_REBUILD)
+ .setTenantId(tenantId)
+ .setSchemaName(schemaName)
+ .setTableName(dataTableName)
+ .setTaskStatus(
+
PTable.TaskStatus.CREATED.toString())
+ .setData(json)
+ .setPriority(null)
+ .setStartTs(ts)
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .setUseTaskEndpoint(true).build());
Review comment:
I'm unclear on the use of `useTaskEndpoint`. Here we are calling the
endpoint anyways. In which cases are we planning on not using the endpoint?
server-side invocations?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]