http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java index 0000000,4a97f89..267c9e8 mode 000000,100644..100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java @@@ -1,0 -1,155 +1,162 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hive.metastore.model; + + import java.util.List; + import java.util.Map; + + public class MPartition { + + private String partitionName; // partitionname ==> (key=value/)*(key=value) + private MTable table; + private List<String> values; + private int createTime; + private int lastAccessTime; + private MStorageDescriptor sd; + private Map<String, String> parameters; - ++ private long writeId; + + public MPartition() {} + + /** + * @param partitionName + * @param table + * @param values + * @param createTime + * @param lastAccessTime + * @param sd + * @param parameters + */ + public MPartition(String partitionName, MTable table, List<String> values, int createTime, + int lastAccessTime, MStorageDescriptor sd, Map<String, String> parameters) { + this.partitionName = partitionName; + this.table = table; + this.values = values; + this.createTime = createTime; + this.lastAccessTime = lastAccessTime; + this.sd = sd; + this.parameters = parameters; + } + + /** + * @return the lastAccessTime + */ + public int getLastAccessTime() { + return lastAccessTime; + } + + /** + * @param lastAccessTime the lastAccessTime to set + */ + public void setLastAccessTime(int lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + /** + * @return the values + */ + public List<String> getValues() { + return values; + } + + /** + * @param values the values to set + */ + public void setValues(List<String> values) { + this.values = values; + } + + /** + * @return the table + */ + public MTable getTable() { + return table; + } + + /** + * @param table the table to set + */ + public void setTable(MTable table) { + this.table = table; + } + + /** + * @return the sd + */ + public MStorageDescriptor getSd() { + return sd; + } + + /** + * @param sd the sd to set + */ + public void setSd(MStorageDescriptor sd) { + this.sd = sd; + } + + /** + * @return the parameters + */ + public Map<String, String> getParameters() { + return parameters; + } + + /** + * @param parameters the parameters to set + */ + public void setParameters(Map<String, String> parameters) { + this.parameters = parameters; + } + + /** + * @return the partitionName + */ + public String getPartitionName() { + return partitionName; + } + + /** + * @param partitionName the partitionName to set + */ + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + /** + * @return the createTime + */ + public int getCreateTime() { + return createTime; + } + + /** + * @param createTime the createTime to set + */ + public void setCreateTime(int createTime) { + this.createTime = createTime; + } + ++ public long getWriteId() { ++ return writeId; ++ } ++ ++ public void setWriteId(long writeId) { ++ this.writeId = writeId; ++ } + }
http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java index 0000000,38ad479..deeb971 mode 000000,100644..100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java @@@ -1,0 -1,273 +1,283 @@@ ++ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hive.metastore.model; + + import java.util.List; + import java.util.Map; + + public class MTable { + + private String tableName; + private MDatabase database; + private MStorageDescriptor sd; + private String owner; + private String ownerType; + private int createTime; + private int lastAccessTime; + private int retention; + private List<MFieldSchema> partitionKeys; + private Map<String, String> parameters; + private String viewOriginalText; + private String viewExpandedText; + private boolean rewriteEnabled; + private String tableType; ++ private long writeId; + + public MTable() {} + + /** + * @param tableName + * @param database + * @param sd + * @param owner + * @param ownerType + * @param createTime + * @param lastAccessTime + * @param retention + * @param partitionKeys + * @param parameters + * @param viewOriginalText + * @param viewExpandedText + * @param tableType + */ + public MTable(String tableName, MDatabase database, MStorageDescriptor sd, String owner, String ownerType, + int createTime, int lastAccessTime, int retention, List<MFieldSchema> partitionKeys, + Map<String, String> parameters, String viewOriginalText, String viewExpandedText, + boolean rewriteEnabled, String tableType) { + this.tableName = tableName; + this.database = database; + this.sd = sd; + this.owner = owner; + this.ownerType = ownerType; + this.createTime = createTime; + this.setLastAccessTime(lastAccessTime); + this.retention = retention; + this.partitionKeys = partitionKeys; + this.parameters = parameters; + this.viewOriginalText = viewOriginalText; + this.viewExpandedText = viewExpandedText; + this.rewriteEnabled = rewriteEnabled; + this.tableType = tableType; + } + + /** + * @return the tableName + */ + public String getTableName() { + return tableName; + } + + /** + * @param tableName the tableName to set + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * @return the sd + */ + public MStorageDescriptor getSd() { + return sd; + } + + /** + * @param sd the sd to set + */ + public void setSd(MStorageDescriptor sd) { + this.sd = sd; + } + + /** + * @return the partKeys + */ + public List<MFieldSchema> getPartitionKeys() { + return partitionKeys; + } + + /** + * @param partKeys the partKeys to set + */ + public void setPartitionKeys(List<MFieldSchema> partKeys) { + this.partitionKeys = partKeys; + } + + /** + * @return the parameters + */ + public Map<String, String> getParameters() { + return parameters; + } + + /** + * @param parameters the parameters to set + */ + public void setParameters(Map<String, String> parameters) { + this.parameters = parameters; + } + + /** + * @return the original view text, or null if this table is not a view + */ + public String getViewOriginalText() { + return viewOriginalText; + } + + /** + * @param viewOriginalText the original view text to set + */ + public void setViewOriginalText(String viewOriginalText) { + this.viewOriginalText = viewOriginalText; + } + + /** + * @return the expanded view text, or null if this table is not a view + */ + public String getViewExpandedText() { + return viewExpandedText; + } + + /** + * @param viewExpandedText the expanded view text to set + */ + public void setViewExpandedText(String viewExpandedText) { + this.viewExpandedText = viewExpandedText; + } + + /** + * @return whether the view can be used for rewriting queries + */ + public boolean isRewriteEnabled() { + return rewriteEnabled; + } + + /** + * @param rewriteEnabled whether the view can be used for rewriting queries + */ + public void setRewriteEnabled(boolean rewriteEnabled) { + this.rewriteEnabled = rewriteEnabled; + } + + /** + * @return the owner + */ + public String getOwner() { + return owner; + } + + /** + * @param owner the owner to set + */ + public void setOwner(String owner) { + this.owner = owner; + } + + /** + * @return the owner type + */ + public String getOwnerType() { + return ownerType; + } + + /** + * @param ownerType the owner type to set + */ + public void setOwnerType(String ownerType) { + this.ownerType = ownerType; + } + + /** + * @return the createTime + */ + public int getCreateTime() { + return createTime; + } + + /** + * @param createTime the createTime to set + */ + public void setCreateTime(int createTime) { + this.createTime = createTime; + } + + /** + * @return the database + */ + public MDatabase getDatabase() { + return database; + } + + /** + * @param database the database to set + */ + public void setDatabase(MDatabase database) { + this.database = database; + } + + /** + * @return the retention + */ + public int getRetention() { + return retention; + } + + /** + * @param retention the retention to set + */ + public void setRetention(int retention) { + this.retention = retention; + } + + /** + * @param lastAccessTime the lastAccessTime to set + */ + public void setLastAccessTime(int lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + /** + * @return the lastAccessTime + */ + public int getLastAccessTime() { + return lastAccessTime; + } + + /** + * @param tableType the tableType to set + */ + public void setTableType(String tableType) { + this.tableType = tableType; + } + + /** + * @return the tableType + */ + public String getTableType() { + return tableType; + } ++ ++ public long getWriteId() { ++ return writeId; ++ } ++ ++ public void setWriteId(long writeId) { ++ this.writeId = writeId; ++ } + } http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 0000000,4e3068d..1f559e9 mode 000000,100644..100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@@ -1,0 -1,1107 +1,1158 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.txn; + ++import org.apache.hadoop.hive.common.StatsSetupConst; + import org.apache.hadoop.hive.common.classification.RetrySemantics; + import org.apache.hadoop.hive.metastore.api.CompactionType; + import org.apache.hadoop.hive.metastore.api.MetaException; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; + import org.apache.hadoop.util.StringUtils; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.sql.Connection; + import java.sql.PreparedStatement; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Statement; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + + /** + * Extends the transaction handler with methods needed only by the compactor threads. These + * methods are not available through the thrift interface. + */ + class CompactionTxnHandler extends TxnHandler { + static final private String CLASS_NAME = CompactionTxnHandler.class.getName(); + static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + public CompactionTxnHandler() { + } + + /** + * This will look through the completed_txn_components table and look for partitions or tables + * that may be ready for compaction. Also, look through txns and txn_components tables for + * aborted transactions that we should add to the list. + * @param maxAborted Maximum number of aborted queries to allow before marking this as a + * potential compaction. + * @return list of CompactionInfo structs. These will not have id, type, + * or runAs set since these are only potential compactions not actual ones. + */ + @Override + @RetrySemantics.ReadOnly + public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException { + Connection dbConn = null; + Set<CompactionInfo> response = new HashSet<>(); + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + // Check for completed transactions + String s = "select distinct ctc_database, ctc_table, " + + "ctc_partition from COMPLETED_TXN_COMPONENTS"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + response.add(info); + } + rs.close(); + + // Check for aborted txns + s = "select tc_database, tc_table, tc_partition " + + "from TXNS, TXN_COMPONENTS " + + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + + "group by tc_database, tc_table, tc_partition " + + "having count(*) > " + maxAborted; + + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + info.tooManyAborts = true; + response.add(info); + } + + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database " + e.getMessage()); + checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); + } finally { + close(rs, stmt, dbConn); + } + return response; + } + catch (RetryException e) { + return findPotentialCompactions(maxAborted); + } + } + + /** + * Sets the user to run as. This is for the case + * where the request was generated by the user and so the worker must set this value later. + * @param cq_id id of this entry in the queue + * @param user user to run the jobs as + */ + @Override + @RetrySemantics.Idempotent + public void setRunAs(long cq_id, String user) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + LOG.debug("Going to execute update <" + s + ">"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to update compaction queue, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")"); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + } catch (RetryException e) { + setRunAs(cq_id, user); + } + } + + /** + * This will grab the next compaction request off of + * the queue, and assign it to the worker. + * @param workerId id of the worker calling this, will be recorded in the db + * @return an info element for this compaction request, or null if there is no work to do now. + */ + @Override + @RetrySemantics.SafeToRetry + public CompactionInfo findNextToCompact(String workerId) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725) + Statement updStmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("No compactions found ready to compact"); + dbConn.rollback(); + return null; + } + updStmt = dbConn.createStatement(); + do { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); + info.properties = rs.getString(6); + // Now, update this record as being worked on by this worker. + long now = getDbTime(dbConn); + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + " AND cq_state='" + INITIATED_STATE + "'"; + LOG.debug("Going to execute update <" + s + ">"); + int updCount = updStmt.executeUpdate(s); + if(updCount == 1) { + dbConn.commit(); + return info; + } + if(updCount == 0) { + LOG.debug("Another Worker picked up " + info); + continue; + } + LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + + info + ". updCnt=" + updCount + "."); + dbConn.rollback(); + return null; + } while( rs.next()); + dbConn.rollback(); + return null; + } catch (SQLException e) { + LOG.error("Unable to select next element for compaction, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(updStmt); + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findNextToCompact(workerId); + } + } + + /** + * This will mark an entry in the queue as compacted + * and put it in the ready to clean state. + * @param info info on the compaction entry to mark as compacted. + */ + @Override + @RetrySemantics.SafeToRetry + public void markCompacted(CompactionInfo info) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + + "cq_worker_id = null where cq_id = " + info.id; + LOG.debug("Going to execute update <" + s + ">"); + int updCnt = stmt.executeUpdate(s); + if (updCnt != 1) { + LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to update compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCompacted(" + info + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + markCompacted(info); + } + } + + /** + * Find entries in the queue that are ready to + * be cleaned. + * @return information on the entry in the queue. + */ + @Override + @RetrySemantics.ReadOnly + public List<CompactionInfo> findReadyToClean() throws MetaException { + Connection dbConn = null; + List<CompactionInfo> rc = new ArrayList<>(); + + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_id, cq_database, cq_table, cq_partition, " + + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + + READY_FOR_CLEANING + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; + case MINOR_TYPE: info.type = CompactionType.MINOR; break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + info.runAs = rs.getString(6); + info.highestWriteId = rs.getLong(7); + rc.add(info); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + return rc; + } catch (SQLException e) { + LOG.error("Unable to select next element for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findReadyToClean"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findReadyToClean(); + } + } + + /** + * This will remove an entry from the queue after + * it has been compacted. + * + * @param info info on the compaction entry to remove + */ + @Override + @RetrySemantics.CannotRetry + public void markCleaned(CompactionInfo info) throws MetaException { + try { + Connection dbConn = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, info.id); + rs = pStmt.executeQuery(); + if(rs.next()) { + info = CompactionInfo.loadFullFromCompactionQueue(rs); + } + else { + throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); + } + close(rs); + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, info.id); + LOG.debug("Going to execute update <" + s + ">"); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + info.state = SUCCEEDED_STATE; + CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); + updCount = pStmt.executeUpdate(); + + // Remove entries from completed_txn_components as well, so we don't start looking there + // again but only up to the highest write ID include in this compaction job. + //highestWriteId will be NULL in upgrade scenarios + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; + if (info.partName != null) { + s += " and ctc_partition = ?"; + } + if(info.highestWriteId != 0) { + s += " and ctc_writeid <= ?"; + } + pStmt = dbConn.prepareStatement(s); + int paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + if(info.highestWriteId != 0) { + pStmt.setLong(paramCount++, info.highestWriteId); + } + LOG.debug("Going to execute update <" + s + ">"); + if (pStmt.executeUpdate() < 1) { + LOG.error("Expected to remove at least one row from completed_txn_components when " + + "marking compaction entry as clean!"); + } + + s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; + + pStmt = dbConn.prepareStatement(s); + paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if(info.highestWriteId != 0) { + pStmt.setLong(paramCount++, info.highestWriteId); + } + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + + LOG.debug("Going to execute update <" + s + ">"); + rs = pStmt.executeQuery(); + List<Long> txnids = new ArrayList<>(); + List<String> questions = new ArrayList<>(); + while (rs.next()) { + long id = rs.getLong(1); + txnids.add(id); + questions.add("?"); + } + // Remove entries from txn_components, as there may be aborted txn components + if (txnids.size() > 0) { + List<String> queries = new ArrayList<>(); + + // Prepare prefix and suffix + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("delete from TXN_COMPONENTS where "); + + //because 1 txn may include different partitions/tables even in auto commit mode + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); + if (info.partName != null) { + suffix.append(" and tc_partition = ?"); + } + + // Populate the complete query with provided prefix and suffix + List<Integer> counts = TxnUtils + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + true, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + int insertCount = counts.get(i); + + LOG.debug("Going to execute update <" + query + ">"); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, txnids.get(totalCount + j)); + } + totalCount += insertCount; + paramCount = insertCount + 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + int rc = pStmt.executeUpdate(); + LOG.debug("Removed " + rc + " records from txn_components"); + + // Don't bother cleaning from the txns table. A separate call will do that. We don't + // know here which txns still have components from other tables or partitions in the + // table, so we don't know which ones we can and cannot clean. + } + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCleaned(" + info + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, pStmt, dbConn); + } + } catch (RetryException e) { + markCleaned(info); + } + } + + /** + * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by + * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + */ + @Override + @RetrySemantics.SafeToRetry + public void cleanTxnToWriteIdTable() throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + + try { + // We query for minimum values in all the queries and they can only increase by any concurrent + // operations. So, READ COMMITTED is sufficient. + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. + // If there are no txns which are currently open or aborted in the system, then current value of + // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. + String s = "select ntxn_next from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long minUncommittedTxnId = rs.getLong(1); + + // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table + // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next. + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + long minOpenTxnId = rs.getLong(1); + if (minOpenTxnId > 0) { + minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId); + } + } + + // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid + // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). + s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + long minAbortedTxnId = rs.getLong(1); + if (minAbortedTxnId > 0) { + minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId); + } + } + + // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed + // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. + s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; + LOG.debug("Going to execute delete <" + s + ">"); + int rc = stmt.executeUpdate(s); + LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from txns table " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanTxnToWriteIdTable"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + cleanTxnToWriteIdTable(); + } + } + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reason such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + */ + @Override + @RetrySemantics.SafeToRetry + public void cleanEmptyAbortedTxns() throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + //Aborted is a terminal state, so nothing about the txn can change + //after that, so READ COMMITTED is sufficient. + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; ++ "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + ++ "txn_state = '" + TXN_ABORTED + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + List<Long> txnids = new ArrayList<>(); + while (rs.next()) txnids.add(rs.getLong(1)); + close(rs); + if(txnids.size() <= 0) { + return; + } + Collections.sort(txnids);//easier to read logs ++ + List<String> queries = new ArrayList<>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + ++ // Turn off COLUMN_STATS_ACCURATE for txnids' components in TBLS and PARTITIONS ++ prefix.append("select tbl_id from TBLS inner join DBS on TBLS.DB_ID = DBS.DB_ID " ++ + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME" ++ + " and t2w_writeid = TBLS.WRITE_ID where "); ++ suffix.append(""); ++ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false); ++ ++ // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from TABLE_PARAMS for the txnids. ++ List<StringBuilder> finalCommands = new ArrayList<>(queries.size()); ++ for (int i = 0; i < queries.size(); i++) { ++ String query = queries.get(i); ++ finalCommands.add(i, new StringBuilder("delete from TABLE_PARAMS " + ++ " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and tbl_id in (")); ++ finalCommands.get(i).append(query + ")"); ++ LOG.debug("Going to execute update <" + finalCommands.get(i) + ">"); ++ int rc = stmt.executeUpdate(finalCommands.get(i).toString()); ++ LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from TBLS"); ++ } ++ ++ queries.clear(); ++ prefix.setLength(0); ++ suffix.setLength(0); ++ finalCommands.clear(); ++ ++ // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from PARTITIONS_PARAMS for the txnids. ++ prefix.append("select part_id from PARTITIONS " ++ + "inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID " ++ + "inner join DBS on TBLS.DB_ID = DBS.DB_ID " ++ + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME" ++ + " and t2w_writeid = TBLS.WRITE_ID where "); ++ suffix.append(""); ++ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false); ++ ++ for (int i = 0; i < queries.size(); i++) { ++ String query = queries.get(i); ++ finalCommands.add(i, new StringBuilder("delete from PARTITION_PARAMS " + ++ " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and part_id in (")); ++ finalCommands.get(i).append(query + ")"); ++ LOG.debug("Going to execute update <" + finalCommands.get(i) + ">"); ++ int rc = stmt.executeUpdate(finalCommands.get(i).toString()); ++ LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from PARTITIONS"); ++ } ++ ++ queries.clear(); ++ prefix.setLength(0); ++ suffix.setLength(0); ++ finalCommands.clear(); ++ ++ // Delete from TXNS. + prefix.append("delete from TXNS where "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int rc = stmt.executeUpdate(query); + LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); + } + LOG.info("Aborted transactions removed from TXNS: " + txnids); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from txns table " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + cleanEmptyAbortedTxns(); + } + } + + /** + * This will take all entries assigned to workers + * on a host return them to INITIATED state. The initiator should use this at start up to + * clean entries from any workers that were in the middle of compacting when the metastore + * shutdown. It does not reset entries from worker threads on other hosts as those may still + * be working. + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, + * so that like hostname% will match the worker id. + */ + @Override + @RetrySemantics.Idempotent + public void revokeFromLocalWorkers(String hostname) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + + hostname + "%'"; + LOG.debug("Going to execute update <" + s + ">"); + // It isn't an error if the following returns no rows, as the local workers could have died + // with nothing assigned to them. + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + revokeFromLocalWorkers(hostname); + } + } + + /** + * This call will return all compaction queue + * entries assigned to a worker but over the timeout back to the initiated state. + * This should be called by the initiator on start up and occasionally when running to clean up + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called + * first. + * @param timeout number of milliseconds since start time that should elapse before a worker is + * declared dead. + */ + @Override + @RetrySemantics.Idempotent + public void revokeTimedoutWorkers(long timeout) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long latestValidStart = getDbTime(dbConn) - timeout; + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + + latestValidStart; + LOG.debug("Going to execute update <" + s + ">"); + // It isn't an error if the following returns no rows, as the local workers could have died + // with nothing assigned to them. + stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + revokeTimedoutWorkers(timeout); + } + } + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + @Override + @RetrySemantics.ReadOnly + public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { + Connection dbConn = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String quote = getIdentifierQuoteString(dbConn); + StringBuilder bldr = new StringBuilder(); + bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) + .append(" FROM ") + .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) + .append(quote) + .append(" WHERE ") + .append(quote).append("DB_NAME").append(quote).append(" = ?") + .append(" AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = ?"); + if (ci.partName != null) { + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); + } + String s = bldr.toString(); + pStmt = dbConn.prepareStatement(s); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } + + /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : + "PART_COL_STATS") + + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ + LOG.debug("Going to execute <" + s + ">"); + rs = pStmt.executeQuery(); + List<String> columns = new ArrayList<>(); + while (rs.next()) { + columns.add(rs.getString(1)); + } + LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName)); + dbConn.commit(); + return columns; + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName) + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, pStmt, dbConn); + } + } catch (RetryException ex) { + return findColumnsWithStats(ci); + } + } + + /** + * Record the highest txn id that the {@code ci} compaction job will pay attention to. + * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. + */ + @Override + @RetrySemantics.Idempotent + public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException { + Connection dbConn = null; + Statement stmt = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId + + " WHERE CQ_ID = " + ci.id); + if(updCount != 1) { + throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci); + } + dbConn.commit(); + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException ex) { + setCompactionHighestWriteId(ci, highestWriteId); + } + } + private static class RetentionCounters { + int attemptedRetention = 0; + int failedRetention = 0; + int succeededRetention = 0; + RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) { + this.attemptedRetention = attemptedRetention; + this.failedRetention = failedRetention; + this.succeededRetention = succeededRetention; + } + } + private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) { + switch (ci.state) { + case ATTEMPTED_STATE: + if(--rc.attemptedRetention < 0) { + deleteSet.add(ci.id); + } + break; + case FAILED_STATE: + if(--rc.failedRetention < 0) { + deleteSet.add(ci.id); + } + break; + case SUCCEEDED_STATE: + if(--rc.succeededRetention < 0) { + deleteSet.add(ci.id); + } + break; + default: + //do nothing to hanlde future RU/D where we may want to add new state types + } + } + + /** + * For any given compactable entity (partition; table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + @Override + @RetrySemantics.SafeToRetry + public void purgeCompactionHistory() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + List<Long> deleteSet = new ArrayList<>(); + RetentionCounters rc = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, + thus this query groups by entity and withing group sorts most recent first*/ + rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + + "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); + String lastCompactedEntity = null; + /*In each group, walk from most recent and count occurences of each state type. Once you + * have counted enough (for each state) to satisfy retention policy, delete all other + * instances of this status.*/ + while(rs.next()) { + CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0)); + if(!ci.getFullPartitionName().equals(lastCompactedEntity)) { + lastCompactedEntity = ci.getFullPartitionName(); + rc = new RetentionCounters(MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), + getFailedCompactionRetention(), + MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED)); + } + checkForDeletion(deleteSet, ci, rc); + } + close(rs); + + if (deleteSet.size() <= 0) { + return; + } + + List<String> queries = new ArrayList<>(); + + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("delete from COMPLETED_COMPACTIONS where "); + suffix.append(""); + + List<String> questions = new ArrayList<>(deleteSet.size()); + for (int i = 0; i < deleteSet.size(); i++) { + questions.add("?"); + } + List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + long insertCount = counts.get(i); + LOG.debug("Going to execute update <" + query + ">"); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, deleteSet.get(totalCount + j)); + } + totalCount += insertCount; + int count = pStmt.executeUpdate(); + LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); + } + dbConn.commit(); + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "purgeCompactionHistory()"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + closeStmt(pStmt); + } + } catch (RetryException ex) { + purgeCompactionHistory(); + } + } + /** + * this ensures that the number of failed compaction entries retained is > than number of failed + * compaction threshold which prevents new compactions from being scheduled. + */ + private int getFailedCompactionRetention() { + int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + int failedRetention = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED); + if(failedRetention < failedThreshold) { + LOG.warn("Invalid configuration " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() + + "=" + failedRetention + " < " + ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" + + failedRetention + ". Will use " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() + + "=" + failedRetention); + failedRetention = failedThreshold; + } + return failedRetention; + } + /** + * Returns {@code true} if there already exists sufficient number of consecutive failures for + * this table/partition so that no new automatic compactions will be scheduled. + * User initiated compactions don't do this check. + * + * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should. + * That would be a meta operations, i.e. first find all partitions for this table (which have + * txn info) and schedule each compaction separately. This avoids complications in this logic. + */ + @Override + @RetrySemantics.ReadOnly + public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { + Connection dbConn = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } + rs = pStmt.executeQuery(); + int numFailed = 0; + int numTotal = 0; + int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + while(rs.next() && ++numTotal <= failedThreshold) { + if(rs.getString(1).charAt(0) == FAILED_STATE) { + numFailed++; + } + else { + numFailed--; + } + } + return numFailed == failedThreshold; + } + catch (SQLException e) { + LOG.error("Unable to check for failed compactions " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + return false;//weren't able to check + } finally { + close(rs, pStmt, dbConn); + } + } catch (RetryException e) { + return checkFailedCompactions(ci); + } + } + /** + * If there is an entry in compaction_queue with ci.id, remove it + * Make entry in completed_compactions with status 'f'. + * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction, + * which we record as ATTEMPTED_STATE entry in history. + */ + @Override + @RetrySemantics.CannotRetry + public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw + //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure + try { + Connection dbConn = null; + Statement stmt = null; + PreparedStatement pStmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, ci.id); + rs = pStmt.executeQuery(); + if(rs.next()) { + ci = CompactionInfo.loadFullFromCompactionQueue(rs); + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, ci.id); + LOG.debug("Going to execute update <" + s + ">"); + int updCnt = pStmt.executeUpdate(); + } + else { + if(ci.id > 0) { + //the record with valid CQ_ID has disappeared - this is a sign of something wrong + throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE"); + } + } + if(ci.id == 0) { + //The failure occurred before we even made an entry in COMPACTION_QUEUE + //generate ID so that we can make an entry in COMPLETED_COMPACTIONS + ci.id = generateCompactionQueueId(stmt); + //mostly this indicates that the Initiator is paying attention to some table even though + //compactions are not happening. + ci.state = ATTEMPTED_STATE; + //this is not strictly accurate, but 'type' cannot be null. + if(ci.type == null) { ci.type = CompactionType.MINOR; } + ci.start = getDbTime(dbConn); + } + else { + ci.state = FAILED_STATE; + } + close(rs, stmt, null); + closeStmt(pStmt); + + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); + CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + LOG.debug("Going to commit"); + closeStmt(pStmt); + dbConn.commit(); + } catch (SQLException e) { + LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + try { + checkRetryable(dbConn, e, "markFailed(" + ci + ")"); + } + catch(MetaException ex) { + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); + } + LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); + } finally { + close(rs, stmt, null); + close(null, pStmt, dbConn); + } + } catch (RetryException e) { + markFailed(ci); + } + } + @Override + @RetrySemantics.Idempotent + public void setHadoopJobId(String hadoopJobId, long id) { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; + LOG.debug("Going to execute <" + s + ">"); + int updateCount = stmt.executeUpdate(s); + LOG.debug("Going to commit"); + closeStmt(stmt); + dbConn.commit(); + } catch (SQLException e) { + LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + try { + checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); + } + catch(MetaException ex) { + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); + } + LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + setHadoopJobId(hadoopJobId, id); + } + } + } + + http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 0000000,f8c2ca2..319e612 mode 000000,100644..100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@@ -1,0 -1,505 +1,599 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.txn; + + import java.sql.Connection; + import java.sql.Driver; + import java.sql.PreparedStatement; + import java.sql.ResultSet; + import java.sql.ResultSetMetaData; + import java.sql.SQLException; + import java.sql.SQLTransactionRollbackException; + import java.sql.Statement; + import java.util.Properties; + + import com.google.common.annotations.VisibleForTesting; ++import jline.internal.Log; + import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.hive.metastore.api.MetaException; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; ++import org.apache.zookeeper.txn.TxnHeader; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Utility methods for creating and destroying txn database/schema, plus methods for + * querying against metastore tables. + * Placed here in a separate class so it can be shared across unit tests. + */ + public final class TxnDbUtil { + + static final private Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); + private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + + private static int deadlockCnt = 0; + + private TxnDbUtil() { + throw new UnsupportedOperationException("Can't initialize class"); + } + + /** + * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true, + * and the JDBC configs will be set for putting the transaction and lock info in the embedded + * metastore. + * + * @param conf HiveConf to add these values to + */ + public static void setConfValues(Configuration conf) { + MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER); + MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + } + + public static void prepDb(Configuration conf) throws Exception { + // This is a bogus hack because it copies the contents of the SQL file + // intended for creating derby databases, and thus will inexorably get + // out of date with it. I'm open to any suggestions on how to make this + // read the file in a build friendly way. + + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(conf); + stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TXNS (" + + " TXN_ID bigint PRIMARY KEY," + + " TXN_STATE char(1) NOT NULL," + + " TXN_STARTED bigint NOT NULL," + + " TXN_LAST_HEARTBEAT bigint NOT NULL," + + " TXN_USER varchar(128) NOT NULL," + + " TXN_HOST varchar(128) NOT NULL," + + " TXN_TYPE integer)"); + + stmt.execute("CREATE TABLE TXN_COMPONENTS (" + + " TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID)," + + " TC_DATABASE varchar(128) NOT NULL," + + " TC_TABLE varchar(128)," + + " TC_PARTITION varchar(767)," + + " TC_OPERATION_TYPE char(1) NOT NULL," + + " TC_WRITEID bigint)"); + stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" + + " CTC_TXNID bigint NOT NULL," + + " CTC_DATABASE varchar(128) NOT NULL," + + " CTC_TABLE varchar(128)," + + " CTC_PARTITION varchar(767)," + + " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," + + " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," + + " CTC_WRITEID bigint)"); + stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)"); + + stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" + + " T2W_TXNID bigint NOT NULL," + + " T2W_DATABASE varchar(128) NOT NULL," + + " T2W_TABLE varchar(256) NOT NULL," + + " T2W_WRITEID bigint NOT NULL)"); + stmt.execute("CREATE TABLE NEXT_WRITE_ID (" + + " NWI_DATABASE varchar(128) NOT NULL," + + " NWI_TABLE varchar(256) NOT NULL," + + " NWI_NEXT bigint NOT NULL)"); + + stmt.execute("CREATE TABLE MIN_HISTORY_LEVEL (" + + " MHL_TXNID bigint NOT NULL," + + " MHL_MIN_OPEN_TXNID bigint NOT NULL," + + " PRIMARY KEY(MHL_TXNID))"); + + stmt.execute("CREATE TABLE HIVE_LOCKS (" + + " HL_LOCK_EXT_ID bigint NOT NULL," + + " HL_LOCK_INT_ID bigint NOT NULL," + + " HL_TXNID bigint NOT NULL," + + " HL_DB varchar(128) NOT NULL," + + " HL_TABLE varchar(128)," + + " HL_PARTITION varchar(767)," + + " HL_LOCK_STATE char(1) NOT NULL," + + " HL_LOCK_TYPE char(1) NOT NULL," + + " HL_LAST_HEARTBEAT bigint NOT NULL," + + " HL_ACQUIRED_AT bigint," + + " HL_USER varchar(128) NOT NULL," + + " HL_HOST varchar(128) NOT NULL," + + " HL_HEARTBEAT_COUNT integer," + + " HL_AGENT_INFO varchar(128)," + + " HL_BLOCKEDBY_EXT_ID bigint," + + " HL_BLOCKEDBY_INT_ID bigint," + + " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))"); + stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)"); + + stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)"); + + stmt.execute("CREATE TABLE COMPACTION_QUEUE (" + + " CQ_ID bigint PRIMARY KEY," + + " CQ_DATABASE varchar(128) NOT NULL," + + " CQ_TABLE varchar(128) NOT NULL," + + " CQ_PARTITION varchar(767)," + + " CQ_STATE char(1) NOT NULL," + + " CQ_TYPE char(1) NOT NULL," + + " CQ_TBLPROPERTIES varchar(2048)," + + " CQ_WORKER_ID varchar(128)," + + " CQ_START bigint," + + " CQ_RUN_AS varchar(128)," + + " CQ_HIGHEST_WRITE_ID bigint," + + " CQ_META_INFO varchar(2048) for bit data," + + " CQ_HADOOP_JOB_ID varchar(32))"); + + stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); + stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); + + stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" + + " CC_ID bigint PRIMARY KEY," + + " CC_DATABASE varchar(128) NOT NULL," + + " CC_TABLE varchar(128) NOT NULL," + + " CC_PARTITION varchar(767)," + + " CC_STATE char(1) NOT NULL," + + " CC_TYPE char(1) NOT NULL," + + " CC_TBLPROPERTIES varchar(2048)," + + " CC_WORKER_ID varchar(128)," + + " CC_START bigint," + + " CC_END bigint," + + " CC_RUN_AS varchar(128)," + + " CC_HIGHEST_WRITE_ID bigint," + + " CC_META_INFO varchar(2048) for bit data," + + " CC_HADOOP_JOB_ID varchar(32))"); + + stmt.execute("CREATE TABLE AUX_TABLE (" + + " MT_KEY1 varchar(128) NOT NULL," + + " MT_KEY2 bigint NOT NULL," + + " MT_COMMENT varchar(255)," + + " PRIMARY KEY(MT_KEY1, MT_KEY2))"); + + stmt.execute("CREATE TABLE WRITE_SET (" + + " WS_DATABASE varchar(128) NOT NULL," + + " WS_TABLE varchar(128) NOT NULL," + + " WS_PARTITION varchar(767)," + + " WS_TXNID bigint NOT NULL," + + " WS_COMMIT_ID bigint NOT NULL," + + " WS_OPERATION_TYPE char(1) NOT NULL)" + ); + + stmt.execute("CREATE TABLE REPL_TXN_MAP (" + + " RTM_REPL_POLICY varchar(256) NOT NULL, " + + " RTM_SRC_TXN_ID bigint NOT NULL, " + + " RTM_TARGET_TXN_ID bigint NOT NULL, " + + " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))" + ); + + try { ++ stmt.execute("CREATE TABLE \"APP\".\"TBLS\" (\"TBL_ID\" BIGINT NOT NULL, " + ++ " \"CREATE_TIME\" INTEGER NOT NULL, \"DB_ID\" BIGINT, \"LAST_ACCESS_TIME\" INTEGER NOT NULL, " + ++ " \"OWNER\" VARCHAR(767), \"OWNER_TYPE\" VARCHAR(10), \"RETENTION\" INTEGER NOT NULL, " + ++ " \"SD_ID\" BIGINT, \"TBL_NAME\" VARCHAR(256), \"TBL_TYPE\" VARCHAR(128), " + ++ " \"VIEW_EXPANDED_TEXT\" LONG VARCHAR, \"VIEW_ORIGINAL_TEXT\" LONG VARCHAR, " + ++ " \"IS_REWRITE_ENABLED\" CHAR(1) NOT NULL DEFAULT \'N\', " + ++ " \"WRITE_ID\" BIGINT DEFAULT 0, " + ++ " PRIMARY KEY (TBL_ID))" ++ ); ++ } catch (SQLException e) { ++ if (e.getMessage() != null && e.getMessage().contains("already exists")) { ++ LOG.info("TBLS table already exist, ignoring"); ++ } else { ++ throw e; ++ } ++ } ++ ++ try { ++ stmt.execute("CREATE TABLE \"APP\".\"PARTITIONS\" (" + ++ " \"PART_ID\" BIGINT NOT NULL, \"CREATE_TIME\" INTEGER NOT NULL, " + ++ " \"LAST_ACCESS_TIME\" INTEGER NOT NULL, \"PART_NAME\" VARCHAR(767), " + ++ " \"SD_ID\" BIGINT, \"TBL_ID\" BIGINT, " + ++ " \"WRITE_ID\" BIGINT DEFAULT 0, " + ++ " PRIMARY KEY (PART_ID))" ++ ); ++ } catch (SQLException e) { ++ if (e.getMessage() != null && e.getMessage().contains("already exists")) { ++ LOG.info("PARTITIONS table already exist, ignoring"); ++ } else { ++ throw e; ++ } ++ } ++ ++ try { ++ stmt.execute("CREATE TABLE \"APP\".\"TABLE_PARAMS\" (" + ++ " \"TBL_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT NULL, " + ++ " \"PARAM_VALUE\" CLOB, " + ++ " PRIMARY KEY (TBL_ID, PARAM_KEY))" ++ ); ++ } catch (SQLException e) { ++ if (e.getMessage() != null && e.getMessage().contains("already exists")) { ++ LOG.info("TABLE_PARAMS table already exist, ignoring"); ++ } else { ++ throw e; ++ } ++ } ++ ++ try { ++ stmt.execute("CREATE TABLE \"APP\".\"PARTITION_PARAMS\" (" + ++ " \"PART_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT NULL, " + ++ " \"PARAM_VALUE\" VARCHAR(4000), " + ++ " PRIMARY KEY (PART_ID, PARAM_KEY))" ++ ); ++ } catch (SQLException e) { ++ if (e.getMessage() != null && e.getMessage().contains("already exists")) { ++ LOG.info("PARTITION_PARAMS table already exist, ignoring"); ++ } else { ++ throw e; ++ } ++ } ++ ++ try { + stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\" VARCHAR(256) NOT " + + + "NULL, \"NEXT_VAL\" BIGINT NOT NULL)" + ); + } catch (SQLException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("SEQUENCE_TABLE table already exist, ignoring"); + } else { + throw e; + } + } + + try { + stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\" BIGINT NOT NULL, " + + + "\"NEXT_EVENT_ID\" BIGINT NOT NULL)" + ); + } catch (SQLException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("NOTIFICATION_SEQUENCE table already exist, ignoring"); + } else { + throw e; + } + } + + try { + stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_LOG\" (\"NL_ID\" BIGINT NOT NULL, " + + "\"DB_NAME\" VARCHAR(128), \"EVENT_ID\" BIGINT NOT NULL, \"EVENT_TIME\" INTEGER NOT" + + + " NULL, \"EVENT_TYPE\" VARCHAR(32) NOT NULL, \"MESSAGE\" CLOB, \"TBL_NAME\" " + + "VARCHAR" + + "(256), \"MESSAGE_FORMAT\" VARCHAR(16))" + ); + } catch (SQLException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("NOTIFICATION_LOG table already exist, ignoring"); + } else { + throw e; + } + } + + stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " + + "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', " + + "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" + + ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" + + ".model.MNotificationLog')"); + + stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" + + " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " + + "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")"); + + try { + stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" + + "WNL_ID bigint NOT NULL," + + "WNL_TXNID bigint NOT NULL," + + "WNL_WRITEID bigint NOT NULL," + + "WNL_DATABASE varchar(128) NOT NULL," + + "WNL_TABLE varchar(128) NOT NULL," + + "WNL_PARTITION varchar(1024) NOT NULL," + + "WNL_TABLE_OBJ clob NOT NULL," + + "WNL_PARTITION_OBJ clob," + + "WNL_FILES clob," + + "WNL_EVENT_TIME integer NOT NULL," + + "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))" + ); + } catch (SQLException e) { + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring"); + } else { + throw e; + } + } + + stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " + + "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " + + "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" + + ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" + + ".model.MTxnWriteNotificationLog')"); + } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException re) { + LOG.error("Error rolling back: " + re.getMessage()); + } + + // Another thread might have already created these tables. + if (e.getMessage() != null && e.getMessage().contains("already exists")) { + LOG.info("Txn tables already exist, returning"); + return; + } + + // This might be a deadlock, if so, let's retry + if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { + LOG.warn("Caught deadlock, retrying db creation"); + prepDb(conf); + } else { + throw e; + } + } finally { + deadlockCnt = 0; + closeResources(conn, stmt, null); + } + } + + public static void cleanDb(Configuration conf) throws Exception { + int retryCount = 0; + while(++retryCount <= 3) { + boolean success = true; + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(conf); + stmt = conn.createStatement(); + + // We want to try these, whether they succeed or fail. + try { + stmt.execute("DROP INDEX HL_TXNID_INDEX"); + } catch (SQLException e) { + if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { + //42X65/3000 means index doesn't exist + LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() + + "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); + success = false; + } + } + + success &= dropTable(stmt, "TXN_COMPONENTS", retryCount); + success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); + success &= dropTable(stmt, "TXNS", retryCount); + success &= dropTable(stmt, "NEXT_TXN_ID", retryCount); + success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount); + success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount); + success &= dropTable(stmt, "MIN_HISTORY_LEVEL", retryCount); + success &= dropTable(stmt, "HIVE_LOCKS", retryCount); + success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount); + success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount); + success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount); + success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount); + success &= dropTable(stmt, "AUX_TABLE", retryCount); + success &= dropTable(stmt, "WRITE_SET", retryCount); + success &= dropTable(stmt, "REPL_TXN_MAP", retryCount); + /* + * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other + * table which are not txn related to generate primary key. So if these tables are dropped + * and other tables are not dropped, then it will create key duplicate error while inserting + * to other table. + */ + } finally { + closeResources(conn, stmt, null); + } + if(success) { + return; + } + } + throw new RuntimeException("Failed to clean up txn tables"); + } + + private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException { + for (int i = 0; i < 3; i++) { + try { + stmt.execute("DROP TABLE " + name); + LOG.debug("Successfully dropped table " + name); + return true; + } catch (SQLException e) { + if ("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) { + LOG.debug("Not dropping " + name + " because it doesn't exist"); + //failed because object doesn't exist + return true; + } + if ("X0Y25".equals(e.getSQLState()) && 30000 == e.getErrorCode()) { + // Intermittent failure + LOG.warn("Intermittent drop failure, retrying, try number " + i); + continue; + } + LOG.error("Unable to drop table " + name + ": " + e.getMessage() + + " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); + } + } + LOG.error("Failed to drop table, don't know why"); + return false; + } + + /** + * A tool to count the number of partitions, tables, + * and databases locked by a particular lockId. + * + * @param lockId lock id to look for lock components + * + * @return number of components, or 0 if there is no lock + */ + public static int countLockComponents(Configuration conf, long lockId) throws Exception { + Connection conn = null; + PreparedStatement stmt = null; + ResultSet rs = null; + try { + conn = getConnection(conf); + stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?"); + stmt.setLong(1, lockId); + rs = stmt.executeQuery(); + if (!rs.next()) { + return 0; + } + return rs.getInt(1); + } finally { + closeResources(conn, stmt, rs); + } + } + + /** ++ * Return true if the transaction of the given txnId is open. ++ * @param conf HiveConf ++ * @param txnId transaction id to search for ++ * @return ++ * @throws Exception ++ */ ++ public static boolean isOpenOrAbortedTransaction(Configuration conf, long txnId) throws Exception { ++ Connection conn = null; ++ PreparedStatement stmt = null; ++ ResultSet rs = null; ++ try { ++ conn = getConnection(conf); ++ conn.setAutoCommit(false); ++ conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); ++ ++ stmt = conn.prepareStatement("SELECT txn_id FROM TXNS WHERE txn_id = ?"); ++ stmt.setLong(1, txnId); ++ rs = stmt.executeQuery(); ++ if (!rs.next()) { ++ return false; ++ } else { ++ return true; ++ } ++ } finally { ++ closeResources(conn, stmt, rs); ++ } ++ } ++ ++ /** + * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables + * @param countQuery countQuery text + * @return count countQuery result + * @throws Exception + */ + public static int countQueryAgent(Configuration conf, String countQuery) throws Exception { + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + try { + conn = getConnection(conf); + stmt = conn.createStatement(); + rs = stmt.executeQuery(countQuery); + if (!rs.next()) { + return 0; + } + return rs.getInt(1); + } finally { + closeResources(conn, stmt, rs); + } + } + @VisibleForTesting + public static String queryToString(Configuration conf, String query) throws Exception { + return queryToString(conf, query, true); + } + public static String queryToString(Configuration conf, String query, boolean includeHeader) + throws Exception { + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + StringBuilder sb = new StringBuilder(); + try { + conn = getConnection(conf); + stmt = conn.createStatement(); + rs = stmt.executeQuery(query); + ResultSetMetaData rsmd = rs.getMetaData(); + if(includeHeader) { + for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rsmd.getColumnName(colPos)).append(" "); + } + sb.append('\n'); + } + while(rs.next()) { + for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) { + sb.append(rs.getObject(colPos)).append(" "); + } + sb.append('\n'); + } + } finally { + closeResources(conn, stmt, rs); + } + return sb.toString(); + } + + static Connection getConnection(Configuration conf) throws Exception { + String jdbcDriver = MetastoreConf.getVar(conf, ConfVars.CONNECTION_DRIVER); + Driver driver = (Driver) Class.forName(jdbcDriver).newInstance(); + Properties prop = new Properties(); + String driverUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY); + String user = MetastoreConf.getVar(conf, ConfVars.CONNECTION_USER_NAME); + String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); + prop.setProperty("user", user); + prop.setProperty("password", passwd); + Connection conn = driver.connect(driverUrl, prop); + conn.setAutoCommit(true); + return conn; + } + + static void closeResources(Connection conn, Statement stmt, ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.error("Error closing ResultSet: " + e.getMessage()); + } + } + + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + System.err.println("Error closing Statement: " + e.getMessage()); + } + } + + if (conn != null) { + try { + conn.rollback(); + } catch (SQLException e) { + System.err.println("Error rolling back: " + e.getMessage()); + } + try { + conn.close(); + } catch (SQLException e) { + System.err.println("Error closing Connection: " + e.getMessage()); + } + } + } + }