[ https://issues.apache.org/jira/browse/PHOENIX-4764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637444#comment-16637444 ]
ASF GitHub Bot commented on PHOENIX-4764: ----------------------------------------- Github user karanmehta93 commented on a diff in the pull request: https://github.com/apache/phoenix/pull/359#discussion_r222443657 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; + +import java.util.Optional; +import java.util.Properties; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.GuardedBy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.TaskType; + +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + + +/** + * Coprocessor for task related operations. This coprocessor would only be registered + * to SYSTEM.TASK table + */ +@SuppressWarnings("deprecation") +public class TaskRegionObserver implements RegionObserver, RegionCoprocessor { + public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class); + protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length); + private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL; + private long timeMaxInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL; + @GuardedBy("TaskRegionObserver.class") + // Added for test purposes + private long initialDelay = 0; + + @Override + public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, + boolean abortRequested) { + executor.shutdownNow(); + GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll(); + } + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves + // among region servers because we relies on server time of RS which is hosting + // SYSTEM.CATALOG + Configuration config = env.getConfiguration(); + long sleepTime = config.getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, + QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL); + try { + if(sleepTime > 0) { + Thread.sleep(sleepTime); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + timeInterval = + config.getLong( + QueryServices.TASK_HANDLING_INTERVAL_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL); + timeMaxInterval = + config.getLong( + QueryServices.TASK_HANDLING_MAX_INTERVAL_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL); + initialDelay = + config.getLong( + QueryServices.TASK_HANDLING_INITIAL_DELAY_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY); + } + + @Override + public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { + final RegionCoprocessorEnvironment env = e.getEnvironment(); + + // turn off verbose deprecation logging + Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation"); + if (deprecationLogger != null) { + deprecationLogger.setLevel(Level.WARN); + } + try { + Class.forName(PhoenixDriver.class.getName()); + DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), timeMaxInterval); + executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS); + } catch (ClassNotFoundException ex) { + LOG.error("ScheduleTask cannot start!", ex); + } + } + + public static void addTask(PhoenixConnection conn, TaskType taskType, Timestamp ts, String tenantId, String schemaName, String tableName) throws IOException { + try { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " VALUES(?,?,?,?,?)"); + stmt.setByte(1, taskType.getSerializedValue()); + stmt.setTimestamp(2, ts); + if (tenantId != null) { + stmt.setString(3, tenantId); + }else { + stmt.setNull(3, Types.VARCHAR); + } + stmt.setString(4, schemaName); + stmt.setString(5, tableName); + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + public static void deleteTask(PhoenixConnection conn, TaskType taskType, Timestamp ts, String tenantId, String schemaName, String tableName) throws IOException { + try { + PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND " + + PhoenixDatabaseMetaData.TASK_TS + " = ? AND " + + PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " IS NULL " : " = '" + tenantId + "'") + " AND " + + PhoenixDatabaseMetaData.TABLE_SCHEM + "= ? AND " + + PhoenixDatabaseMetaData.TABLE_NAME + " = ?"); + stmt.setByte(1, taskType.getSerializedValue()); + stmt.setTimestamp(2, ts); + stmt.setString(3, schemaName); + stmt.setString(4, tableName); + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + /** + * Task runs periodically to clean up task of child views whose parent is dropped + * + */ + public static class DropChildViewsTask extends TimerTask { + private RegionCoprocessorEnvironment env; + private long timeMaxInterval; + + public DropChildViewsTask(RegionCoprocessorEnvironment env, long timeMaxInterval) { + this.env = env; + this.timeMaxInterval = timeMaxInterval; + } + + @Override + public void run() { + PhoenixConnection connForTask = null; + try { + String taskQuery = "SELECT " + + PhoenixDatabaseMetaData.TASK_TS + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue(); + + connForTask = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + PreparedStatement taskStatement = connForTask.prepareStatement(taskQuery); + ResultSet rs = taskStatement.executeQuery(); + while (rs.next()) { + try { + // delete child views only if the parent table is deleted from the system catalog + Properties tenantProps = new Properties(); + if (rs.getString(3) != null) { + tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, rs.getString(3)); + } + PhoenixConnection pconn = QueryUtil.getConnectionOnServer(tenantProps, env.getConfiguration()).unwrap(PhoenixConnection.class); + + MetaDataProtocol.MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(), + rs.getString(3), rs.getString(4), true); + if (result.getMutationCode() != MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) { + MetaDataEndpointImpl.dropChildViews(env, rs.getBytes(2), rs.getBytes(3), rs.getBytes(4)); + } else if (System.currentTimeMillis() < timeMaxInterval + rs.getTimestamp(1).getTime()) { + // skip this task as it has not been expired and its parent table has not been dropped yet --- End diff -- You can also format the log string with `String.format`. > Cleanup metadata of child views for a base table that has been dropped > ---------------------------------------------------------------------- > > Key: PHOENIX-4764 > URL: https://issues.apache.org/jira/browse/PHOENIX-4764 > Project: Phoenix > Issue Type: Sub-task > Reporter: Thomas D'Silva > Assignee: Kadir OZDEMIR > Priority: Major > > When we drop a base table, we no longer drop all the child view metadata. > Clean up the child view metadata during compaction. > If we try to recreate a base table that was previously dropped but whose > child view metadata wasn't cleaned up throw an exception. Add a test for > this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)