[
https://issues.apache.org/jira/browse/PHOENIX-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476469#comment-17476469
]
ASF GitHub Bot commented on PHOENIX-6622:
-----------------------------------------
gjacoby126 commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r785213852
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in
System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask {
+ public static final String DEFAULT = "IndexName";
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(TransformMonitorTask.class);
+
+ private static boolean isDisabled = false;
+
+ // Called from testong
+ @VisibleForTesting
+ public static void disableTransformMonitorTask(boolean disabled) {
+ isDisabled = disabled;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+ Configuration configuration =
HBaseConfiguration.addHbaseResources(conf);
+ boolean transformMonitorEnabled =
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED,
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+ if (!transformMonitorEnabled || isDisabled) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"TransformMonitor is disabled");
+ }
+
+ try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+ SystemTransformRecord systemTransformRecord =
Transform.getTransformRecord(taskRecord.getSchemaName(),
+ taskRecord.getTableName(), null, taskRecord.getTenantId(),
conn);
+ if (systemTransformRecord == null) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+ "No transform record is found");
+ }
+ String tableName =
SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
systemTransformRecord.getLogicalTableName());
+
+ if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name()))
{
+ LOGGER.info("Transform is created, starting the TransformTool
", tableName);
+ // Kick a TransformTool run, it will already update transform
record status and job id
+ TransformTool transformTool =
runTransformTool(systemTransformRecord, conf, false, null, null, false, false);
+ if (transformTool == null) {
+ // This is not a map/reduce error. There must be some
unexpected issue. So, retrying will not solve the underlying issue.
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"TransformTool run failed. Check the parameters.");
+ }
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name()))
{
+ LOGGER.info("Transform is completed, TransformMonitor is done
", tableName);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
&&
+
!PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))
{
+ LOGGER.info("Transform is pending cutover ", tableName);
+ // TODO: check if there are any org migrations on this table
Review comment:
nit: can remove the TODO here as it's not relevant upstream
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
##########
@@ -775,30 +798,58 @@ public void resumeTransform(String[] args, CommandLine
cmdLine) throws Exception
}
runTransform(args, cmdLine);
+
+ // Check if we already have a TransformMonitor task. If we do, remove
those and start a new monitor
+ List<Task.TaskRecord> taskRecordList = Task.queryTaskTable(connection,
null);
+ for (Task.TaskRecord taskRecord : taskRecordList) {
+ if (taskRecord.getTaskType() == PTable.TaskType.TRANSFORM_MONITOR)
{
+ Task.deleteTask(connection.unwrap(PhoenixConnection.class),
PTable.TaskType.TRANSFORM_MONITOR, taskRecord.getTimeStamp(),
taskRecord.getTenantId(),
Review comment:
Isn't each TransformMonitor Task only meant for one particular transform
record? Shouldn't we only delete the task if taskRecord matches the transform
record we're interested in?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
##########
@@ -1117,7 +1117,7 @@ public static Put buildUpdateMutation(KeyValueBuilder
kvBuilder, ValueGetter val
ColumnReference indexColRef = colRefPair.getFirst();
ColumnReference dataColRef = colRefPair.getSecond();
byte[] value = null;
- if (srcImmutableStroageScheme ==
ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ if (srcImmutableStorageScheme ==
ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
Review comment:
nice typo fix. :-)
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/TransformMonitorTask.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor.tasks;
+
+import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.transform.TransformTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.task.SystemTaskParams;
+import org.apache.phoenix.schema.task.Task;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_MONITOR_ENABLED;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_TRANSFORM_RETRY_COUNT;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_MONITOR_ENABLED;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.TRANSFORM_RETRY_COUNT_VALUE;
+
+/**
+ * Task runs periodically to monitor and orchestrate ongoing transforms in
System.Transform table.
+ *
+ */
+public class TransformMonitorTask extends BaseTask {
+ public static final String DEFAULT = "IndexName";
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(TransformMonitorTask.class);
+
+ private static boolean isDisabled = false;
+
+ // Called from testong
+ @VisibleForTesting
+ public static void disableTransformMonitorTask(boolean disabled) {
+ isDisabled = disabled;
+ }
+
+ @Override
+ public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) {
+ Configuration conf = HBaseConfiguration.create(env.getConfiguration());
+ Configuration configuration =
HBaseConfiguration.addHbaseResources(conf);
+ boolean transformMonitorEnabled =
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED,
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+ if (!transformMonitorEnabled || isDisabled) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"TransformMonitor is disabled");
+ }
+
+ try (PhoenixConnection conn =
QueryUtil.getConnectionOnServer(conf).unwrap(PhoenixConnection.class)){
+ SystemTransformRecord systemTransformRecord =
Transform.getTransformRecord(taskRecord.getSchemaName(),
+ taskRecord.getTableName(), null, taskRecord.getTenantId(),
conn);
+ if (systemTransformRecord == null) {
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
+ "No transform record is found");
+ }
+ String tableName =
SchemaUtil.getTableName(systemTransformRecord.getSchemaName(),
systemTransformRecord.getLogicalTableName());
+
+ if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.CREATED.name()))
{
+ LOGGER.info("Transform is created, starting the TransformTool
", tableName);
+ // Kick a TransformTool run, it will already update transform
record status and job id
+ TransformTool transformTool =
runTransformTool(systemTransformRecord, conf, false, null, null, false, false);
+ if (transformTool == null) {
+ // This is not a map/reduce error. There must be some
unexpected issue. So, retrying will not solve the underlying issue.
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
"TransformTool run failed. Check the parameters.");
+ }
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name()))
{
+ LOGGER.info("Transform is completed, TransformMonitor is done
", tableName);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, "");
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
&&
+
!PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType()))
{
+ LOGGER.info("Transform is pending cutover ", tableName);
+ // TODO: check if there are any org migrations on this table
+ Transform.doCutover(conn, systemTransformRecord);
+
+ PTable.TransformType partialTransform =
PTable.TransformType.getPartialTransform(systemTransformRecord.getTransformType());
+ if (partialTransform != null) {
+ // Update transform to be partial
+ SystemTransformRecord.SystemTransformBuilder builder = new
SystemTransformRecord.SystemTransformBuilder(systemTransformRecord);
+ builder.setTransformType(partialTransform);
+ // Decrement retry count since TransformTool will
increment it. Should we set it to 0?
+
builder.setTransformRetryCount(systemTransformRecord.getTransformRetryCount()-1);
+ Transform.upsertTransform(builder.build(), conn);
+
+ // Fix unverified rows. Running partial transform will
make the transform status go back to started
+ long startFromTs = 0;
+ if (systemTransformRecord.getTransformLastStateTs() !=
null) {
+ startFromTs =
systemTransformRecord.getTransformLastStateTs().getTime()-1;
+ }
+ runTransformTool(systemTransformRecord, conf, true,
startFromTs, null, true, false);
+
+ // In the future, if we are changing the PK structure, we
need to run indextools as well
+ } else {
+ // No partial transform needed so, we update state of the
transform
+ LOGGER.warn("No partial type of the transform is found.
Completing the transform ", tableName);
+ Transform.updateTransformRecord(conn,
systemTransformRecord, PTable.TransformStatus.COMPLETED);
+ }
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
||
+
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PENDING_CUTOVER.name())
&&
+
PTable.TransformType.isPartialTransform(systemTransformRecord.getTransformType())))
{
+
LOGGER.info(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.STARTED.name())
?
+ "Transform is started, we will monitor ": "Partial
transform is going on, we will monitor" , tableName);
+ // Monitor the job of transform tool and decide to retry
+ String jobId = systemTransformRecord.getTransformJobId();
+ if (jobId != null) {
+ Cluster cluster = new Cluster(configuration);
+
+ Job job =
cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobId));
+ if (job == null) {
+ LOGGER.warn(String.format("Transform job with Id=%s is
not found", jobId));
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "The
job cannot be found");
+ }
+ if (job != null && job.isComplete()) {
+ if (job.isSuccessful()) {
+ LOGGER.warn("TransformTool job is successful.
Transform should have been in a COMPLETED state "
+ + taskRecord.getTableName());
+ } else {
+ // Retry TransformTool run
+ int maxRetryCount =
configuration.getInt(TRANSFORM_RETRY_COUNT_VALUE,
DEFAULT_TRANSFORM_RETRY_COUNT);
+ if (systemTransformRecord.getTransformRetryCount()
< maxRetryCount) {
+ // Retry count will be incremented in
TransformTool
+ runTransformTool(systemTransformRecord, conf,
false, null, null, false, true);
+ }
+ }
+ }
+ }
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.FAILED.name()))
{
+ String str = "Transform is marked as failed because either
TransformTool is run on the foreground and failed " +
+ "or it is run as async but there is something wrong
with the TransformTool parameters";
+ LOGGER.error(str);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+ } else if
(systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.PAUSED.name()))
{
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS,
+ "Transform is paused. No need to monitor");
+ } else {
+ String str = "Transform status is not known " +
systemTransformRecord.getString();
+ LOGGER.error(str);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str);
+ }
+
+ // Update task status to RETRY so that it is retried
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(conn)
+ .setTaskType(taskRecord.getTaskType())
+ .setTenantId(taskRecord.getTenantId())
+ .setSchemaName(taskRecord.getSchemaName())
+ .setTableName(taskRecord.getTableName())
+ .setTaskStatus(PTable.TaskStatus.RETRY.toString())
+ .setData(taskRecord.getData())
+ .setPriority(taskRecord.getPriority())
+ .setStartTs(taskRecord.getTimeStamp())
+ .setEndTs(null)
+ .setAccessCheckEnabled(true)
+ .build());
+ return null;
+ }
+ catch (Throwable t) {
+ LOGGER.warn("Exception while running transform monitor task. " +
+ "It will be retried in the next system task table scan : "
+
+ taskRecord.getSchemaName() + "." +
taskRecord.getTableName() +
+ " with tenant id " + (taskRecord.getTenantId() == null ? "
IS NULL" : taskRecord.getTenantId()) +
+ " and data " + taskRecord.getData(), t);
+ return new
TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL,
t.toString());
+ }
+ }
+
+ public static void addTransformMonitorTask(PhoenixConnection connection,
Configuration configuration, SystemTransformRecord systemTransformRecord,
+ PTable.TaskStatus taskStatus,
Timestamp startTimestamp, Timestamp endTimestamp) throws IOException {
+ boolean transformMonitorEnabled =
configuration.getBoolean(TRANSFORM_MONITOR_ENABLED,
DEFAULT_TRANSFORM_MONITOR_ENABLED);
+ if (!transformMonitorEnabled) {
+ LOGGER.warn("TransformMonitor is not enabled. Monitoring/retrying
TransformTool and doing cutover will not be done automatically");
+ return;
+ }
+
+ Task.addTask(new SystemTaskParams.SystemTaskParamsBuilder()
+ .setConn(connection)
+ .setTaskType(PTable.TaskType.TRANSFORM_MONITOR)
+ .setTenantId(systemTransformRecord.getTenantId())
+ .setSchemaName(systemTransformRecord.getSchemaName())
+ .setTableName(systemTransformRecord.getLogicalTableName())
+ .setTaskStatus(taskStatus.toString())
+ .setStartTs(startTimestamp)
+ .setEndTs(endTimestamp)
+ .setAccessCheckEnabled(true)
+ .build());
+ }
+
+ private TransformTool runTransformTool(SystemTransformRecord
systemTransformRecord, Configuration configuration,
Review comment:
Should this method be a static method on TransformTool, so the
command-line logic only lives in one place?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> TransformMonitor should orchestrate transform and do retries
> ------------------------------------------------------------
>
> Key: PHOENIX-6622
> URL: https://issues.apache.org/jira/browse/PHOENIX-6622
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Gokcen Iskender
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)