Github user karanmehta93 commented on a diff in the pull request: https://github.com/apache/phoenix/pull/359#discussion_r222443397 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; + +import java.util.Optional; +import java.util.Properties; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.GuardedBy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.TaskType; + +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + + +/** + * Coprocessor for task related operations. This coprocessor would only be registered + * to SYSTEM.TASK table + */ +@SuppressWarnings("deprecation") +public class TaskRegionObserver implements RegionObserver, RegionCoprocessor { + public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class); + protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length); + private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL; + private long timeMaxInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL; + @GuardedBy("TaskRegionObserver.class") + // Added for test purposes + private long initialDelay = 0; + + @Override + public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, + boolean abortRequested) { + executor.shutdownNow(); + } + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + Configuration config = env.getConfiguration(); + timeInterval = + config.getLong( + QueryServices.TASK_HANDLING_INTERVAL_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL); + timeMaxInterval = + config.getLong( + QueryServices.TASK_HANDLING_MAX_INTERVAL_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL); + initialDelay = + config.getLong( + QueryServices.TASK_HANDLING_INITIAL_DELAY_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY); + } + + @Override + public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { + final RegionCoprocessorEnvironment env = e.getEnvironment(); + + // turn off verbose deprecation logging + Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation"); + if (deprecationLogger != null) { + deprecationLogger.setLevel(Level.WARN); + } + try { + Class.forName(PhoenixDriver.class.getName()); + DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), timeMaxInterval); + executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS); + } catch (ClassNotFoundException ex) { + LOG.error("ScheduleTask cannot start!", ex); + } + } + + public static void addTask(PhoenixConnection conn, TaskType taskType, String tenantId, String schemaName, String tableName) + throws IOException { + try { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " (" + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + ") VALUES(?,?,?,?)"); + stmt.setByte(1, taskType.getSerializedValue()); + if (tenantId != null) { + stmt.setString(2, tenantId); + }else { --- End diff -- nit: spaces before/after brackets.
---