[
https://issues.apache.org/jira/browse/PHOENIX-4764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643895#comment-16643895
]
ASF GitHub Bot commented on PHOENIX-4764:
-----------------------------------------
Github user kadirozde commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/359#discussion_r223783140
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.TaskType;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+
+/**
+ * Coprocessor for task related operations. This coprocessor would only be
registered
+ * to SYSTEM.TASK table
+ */
+@SuppressWarnings("deprecation")
+public class TaskRegionObserver implements RegionObserver,
RegionCoprocessor {
+ public static final Log LOG =
LogFactory.getLog(TaskRegionObserver.class);
+ protected ScheduledThreadPoolExecutor executor = new
ScheduledThreadPoolExecutor(TaskType.values().length);
+ private long timeInterval =
QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL;
+ private long timeMaxInterval =
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL;
+ @GuardedBy("TaskRegionObserver.class")
+ // Added for test purposes
+ private long initialDelay = 0;
+
+ @Override
+ public void preClose(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean abortRequested) {
+ executor.shutdownNow();
+
GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll();
+ }
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ // sleep a little bit to compensate time clock skew when
SYSTEM.CATALOG moves
+ // among region servers because we relies on server time of RS
which is hosting
+ // SYSTEM.CATALOG
+ Configuration config = env.getConfiguration();
+ long sleepTime =
config.getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL);
+ try {
+ if(sleepTime > 0) {
+ Thread.sleep(sleepTime);
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ timeInterval =
+ config.getLong(
+ QueryServices.TASK_HANDLING_INTERVAL_ATTRIB,
+ QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL);
+ timeMaxInterval =
+ config.getLong(
+ QueryServices.TASK_HANDLING_MAX_INTERVAL_ATTRIB,
+
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL);
+ initialDelay =
+ config.getLong(
+ QueryServices.TASK_HANDLING_INITIAL_DELAY_ATTRIB,
+
QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY);
+ }
+
+ @Override
+ public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
+ final RegionCoprocessorEnvironment env = e.getEnvironment();
+
+ // turn off verbose deprecation logging
+ Logger deprecationLogger =
Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation");
+ if (deprecationLogger != null) {
+ deprecationLogger.setLevel(Level.WARN);
+ }
+ try {
+ Class.forName(PhoenixDriver.class.getName());
+ DropChildViewsTask task = new
DropChildViewsTask(e.getEnvironment(), timeMaxInterval);
+ executor.scheduleWithFixedDelay(task, initialDelay,
timeInterval, TimeUnit.MILLISECONDS);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("ScheduleTask cannot start!", ex);
+ }
+ }
+
+ public static void addTask(PhoenixConnection conn, TaskType taskType,
Timestamp ts, String tenantId, String schemaName, String tableName) throws
IOException {
+ try {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + "
VALUES(?,?,?,?,?)");
+ stmt.setByte(1, taskType.getSerializedValue());
+ stmt.setTimestamp(2, ts);
+ if (tenantId != null) {
+ stmt.setString(3, tenantId);
+ }else {
+ stmt.setNull(3, Types.VARCHAR);
+ }
+ stmt.setString(4, schemaName);
+ stmt.setString(5, tableName);
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void deleteTask(PhoenixConnection conn, TaskType
taskType, Timestamp ts, String tenantId, String schemaName, String tableName)
throws IOException {
+ try {
+ PreparedStatement stmt = conn.prepareStatement("DELETE FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ?
AND " +
+ PhoenixDatabaseMetaData.TASK_TS + " = ? AND " +
+ PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null
? " IS NULL " : " = '" + tenantId + "'") + " AND " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + "= ? AND " +
+ PhoenixDatabaseMetaData.TABLE_NAME + " = ?");
+ stmt.setByte(1, taskType.getSerializedValue());
+ stmt.setTimestamp(2, ts);
+ stmt.setString(3, schemaName);
+ stmt.setString(4, tableName);
+ stmt.execute();
+ conn.commit();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Task runs periodically to clean up task of child views whose parent
is dropped
+ *
+ */
+ public static class DropChildViewsTask extends TimerTask {
+ private RegionCoprocessorEnvironment env;
+ private long timeMaxInterval;
+
+ public DropChildViewsTask(RegionCoprocessorEnvironment env, long
timeMaxInterval) {
+ this.env = env;
+ this.timeMaxInterval = timeMaxInterval;
+ }
+
+ @Override
+ public void run() {
+ PhoenixConnection connForTask = null;
+ try {
+ String taskQuery = "SELECT " +
+ PhoenixDatabaseMetaData.TASK_TS + ", " +
+ PhoenixDatabaseMetaData.TENANT_ID + ", " +
+ PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
+ PhoenixDatabaseMetaData.TABLE_NAME +
+ " FROM " +
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " =
" + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue();
+
+ connForTask =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ PreparedStatement taskStatement =
connForTask.prepareStatement(taskQuery);
+ ResultSet rs = taskStatement.executeQuery();
+ while (rs.next()) {
+ try {
+ // delete child views only if the parent table is
deleted from the system catalog
+ Properties tenantProps = new Properties();
+ if (rs.getString(3) != null) {
+
tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, rs.getString(3));
+ }
+ PhoenixConnection pconn =
QueryUtil.getConnectionOnServer(tenantProps,
env.getConfiguration()).unwrap(PhoenixConnection.class);
+
+ MetaDataProtocol.MetaDataMutationResult result =
new MetaDataClient(pconn).updateCache(pconn.getTenantId(),
+ rs.getString(3), rs.getString(4), true);
+ if (result.getMutationCode() !=
MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) {
+ MetaDataEndpointImpl.dropChildViews(env,
rs.getBytes(2), rs.getBytes(3), rs.getBytes(4));
+ } else if (System.currentTimeMillis() <
timeMaxInterval + rs.getTimestamp(1).getTime()) {
+ // skip this task as it has not been expired
and its parent table has not been dropped yet
--- End diff --
I am using mostly strings here, so simple string concatenation works. I
will consider it next time.
> Cleanup metadata of child views for a base table that has been dropped
> ----------------------------------------------------------------------
>
> Key: PHOENIX-4764
> URL: https://issues.apache.org/jira/browse/PHOENIX-4764
> Project: Phoenix
> Issue Type: Sub-task
> Reporter: Thomas D'Silva
> Assignee: Kadir OZDEMIR
> Priority: Major
>
> When we drop a base table, we no longer drop all the child view metadata.
> Clean up the child view metadata during compaction.
> If we try to recreate a base table that was previously dropped but whose
> child view metadata wasn't cleaned up throw an exception. Add a test for
> this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)