http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
----------------------------------------------------------------------
diff --cc 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
index 0000000,4a97f89..267c9e8
mode 000000,100644..100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
@@@ -1,0 -1,155 +1,162 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore.model;
+ 
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class MPartition {
+ 
+   private String partitionName; // partitionname ==>  (key=value/)*(key=value)
+   private MTable table; 
+   private List<String> values;
+   private int createTime;
+   private int lastAccessTime;
+   private MStorageDescriptor sd;
+   private Map<String, String> parameters;
 -  
++  private long writeId;
+   
+   public MPartition() {}
+   
+   /**
+    * @param partitionName
+    * @param table
+    * @param values
+    * @param createTime
+    * @param lastAccessTime
+    * @param sd
+    * @param parameters
+    */
+   public MPartition(String partitionName, MTable table, List<String> values, 
int createTime,
+       int lastAccessTime, MStorageDescriptor sd, Map<String, String> 
parameters) {
+     this.partitionName = partitionName;
+     this.table = table;
+     this.values = values;
+     this.createTime = createTime;
+     this.lastAccessTime = lastAccessTime;
+     this.sd = sd;
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the lastAccessTime
+    */
+   public int getLastAccessTime() {
+     return lastAccessTime;
+   }
+ 
+   /**
+    * @param lastAccessTime the lastAccessTime to set
+    */
+   public void setLastAccessTime(int lastAccessTime) {
+     this.lastAccessTime = lastAccessTime;
+   }
+ 
+   /**
+    * @return the values
+    */
+   public List<String> getValues() {
+     return values;
+   }
+ 
+   /**
+    * @param values the values to set
+    */
+   public void setValues(List<String> values) {
+     this.values = values;
+   }
+ 
+   /**
+    * @return the table
+    */
+   public MTable getTable() {
+     return table;
+   }
+ 
+   /**
+    * @param table the table to set
+    */
+   public void setTable(MTable table) {
+     this.table = table;
+   }
+ 
+   /**
+    * @return the sd
+    */
+   public MStorageDescriptor getSd() {
+     return sd;
+   }
+ 
+   /**
+    * @param sd the sd to set
+    */
+   public void setSd(MStorageDescriptor sd) {
+     this.sd = sd;
+   }
+ 
+   /**
+    * @return the parameters
+    */
+   public Map<String, String> getParameters() {
+     return parameters;
+   }
+ 
+   /**
+    * @param parameters the parameters to set
+    */
+   public void setParameters(Map<String, String> parameters) {
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the partitionName
+    */
+   public String getPartitionName() {
+     return partitionName;
+   }
+ 
+   /**
+    * @param partitionName the partitionName to set
+    */
+   public void setPartitionName(String partitionName) {
+     this.partitionName = partitionName;
+   }
+ 
+   /**
+    * @return the createTime
+    */
+   public int getCreateTime() {
+     return createTime;
+   }
+ 
+   /**
+    * @param createTime the createTime to set
+    */
+   public void setCreateTime(int createTime) {
+     this.createTime = createTime;
+   }
+ 
++  public long getWriteId() {
++    return writeId;
++  }
++
++  public void setWriteId(long writeId) {
++    this.writeId = writeId;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
----------------------------------------------------------------------
diff --cc 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
index 0000000,38ad479..deeb971
mode 000000,100644..100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
@@@ -1,0 -1,273 +1,283 @@@
++
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore.model;
+ 
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class MTable {
+   
+   private String tableName;
+   private MDatabase database;
+   private MStorageDescriptor sd;
+   private String owner;
+   private String ownerType;
+   private int createTime;
+   private int lastAccessTime;
+   private int retention;
+   private List<MFieldSchema> partitionKeys;
+   private Map<String, String> parameters;
+   private String viewOriginalText;
+   private String viewExpandedText;
+   private boolean rewriteEnabled;
+   private String tableType;
++  private long writeId;
+ 
+   public MTable() {}
+ 
+   /**
+    * @param tableName
+    * @param database
+    * @param sd
+    * @param owner
+    * @param ownerType
+    * @param createTime
+    * @param lastAccessTime
+    * @param retention
+    * @param partitionKeys
+    * @param parameters
+    * @param viewOriginalText
+    * @param viewExpandedText
+    * @param tableType
+    */
+   public MTable(String tableName, MDatabase database, MStorageDescriptor sd, 
String owner, String ownerType,
+       int createTime, int lastAccessTime, int retention, List<MFieldSchema> 
partitionKeys,
+       Map<String, String> parameters, String viewOriginalText, String 
viewExpandedText,
+       boolean rewriteEnabled, String tableType) {
+     this.tableName = tableName;
+     this.database = database;
+     this.sd = sd;
+     this.owner = owner;
+     this.ownerType = ownerType;
+     this.createTime = createTime;
+     this.setLastAccessTime(lastAccessTime);
+     this.retention = retention;
+     this.partitionKeys = partitionKeys;
+     this.parameters = parameters;
+     this.viewOriginalText = viewOriginalText;
+     this.viewExpandedText = viewExpandedText;
+     this.rewriteEnabled = rewriteEnabled;
+     this.tableType = tableType;
+   }
+ 
+   /**
+    * @return the tableName
+    */
+   public String getTableName() {
+     return tableName;
+   }
+ 
+   /**
+    * @param tableName the tableName to set
+    */
+   public void setTableName(String tableName) {
+     this.tableName = tableName;
+   }
+ 
+   /**
+    * @return the sd
+    */
+   public MStorageDescriptor getSd() {
+     return sd;
+   }
+ 
+   /**
+    * @param sd the sd to set
+    */
+   public void setSd(MStorageDescriptor sd) {
+     this.sd = sd;
+   }
+ 
+   /**
+    * @return the partKeys
+    */
+   public List<MFieldSchema> getPartitionKeys() {
+     return partitionKeys;
+   }
+ 
+   /**
+    * @param partKeys the partKeys to set
+    */
+   public void setPartitionKeys(List<MFieldSchema> partKeys) {
+     this.partitionKeys = partKeys;
+   }
+ 
+   /**
+    * @return the parameters
+    */
+   public Map<String, String> getParameters() {
+     return parameters;
+   }
+ 
+   /**
+    * @param parameters the parameters to set
+    */
+   public void setParameters(Map<String, String> parameters) {
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the original view text, or null if this table is not a view
+    */
+   public String getViewOriginalText() {
+     return viewOriginalText;
+   }
+ 
+   /**
+    * @param viewOriginalText the original view text to set
+    */
+   public void setViewOriginalText(String viewOriginalText) {
+     this.viewOriginalText = viewOriginalText;
+   }
+ 
+   /**
+    * @return the expanded view text, or null if this table is not a view
+    */
+   public String getViewExpandedText() {
+     return viewExpandedText;
+   }
+ 
+   /**
+    * @param viewExpandedText the expanded view text to set
+    */
+   public void setViewExpandedText(String viewExpandedText) {
+     this.viewExpandedText = viewExpandedText;
+   }
+ 
+   /**
+    * @return whether the view can be used for rewriting queries
+    */
+   public boolean isRewriteEnabled() {
+     return rewriteEnabled;
+   }
+ 
+   /**
+    * @param rewriteEnabled whether the view can be used for rewriting queries
+    */
+   public void setRewriteEnabled(boolean rewriteEnabled) {
+     this.rewriteEnabled = rewriteEnabled;
+   }
+ 
+   /**
+    * @return the owner
+    */
+   public String getOwner() {
+     return owner;
+   }
+ 
+   /**
+    * @param owner the owner to set
+    */
+   public void setOwner(String owner) {
+     this.owner = owner;
+   }
+ 
+   /**
+    * @return the owner type
+    */
+   public String getOwnerType() {
+     return ownerType;
+   }
+ 
+   /**
+    * @param ownerType the owner type to set
+    */
+   public void setOwnerType(String ownerType) {
+     this.ownerType = ownerType;
+   }
+ 
+   /**
+    * @return the createTime
+    */
+   public int getCreateTime() {
+     return createTime;
+   }
+ 
+   /**
+    * @param createTime the createTime to set
+    */
+   public void setCreateTime(int createTime) {
+     this.createTime = createTime;
+   }
+ 
+   /**
+    * @return the database
+    */
+   public MDatabase getDatabase() {
+     return database;
+   }
+ 
+   /**
+    * @param database the database to set
+    */
+   public void setDatabase(MDatabase database) {
+     this.database = database;
+   }
+ 
+   /**
+    * @return the retention
+    */
+   public int getRetention() {
+     return retention;
+   }
+ 
+   /**
+    * @param retention the retention to set
+    */
+   public void setRetention(int retention) {
+     this.retention = retention;
+   }
+ 
+   /**
+    * @param lastAccessTime the lastAccessTime to set
+    */
+   public void setLastAccessTime(int lastAccessTime) {
+     this.lastAccessTime = lastAccessTime;
+   }
+ 
+   /**
+    * @return the lastAccessTime
+    */
+   public int getLastAccessTime() {
+     return lastAccessTime;
+   }
+ 
+   /**
+    * @param tableType the tableType to set
+    */
+   public void setTableType(String tableType) {
+     this.tableType = tableType;
+   }
+ 
+   /**
+    * @return the tableType
+    */
+   public String getTableType() {
+     return tableType;
+   }
++
++  public long getWriteId() {
++    return writeId;
++  }
++
++  public void setWriteId(long writeId) {
++    this.writeId = writeId;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --cc 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 0000000,4e3068d..1f559e9
mode 000000,100644..100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@@ -1,0 -1,1107 +1,1158 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
++import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.api.CompactionType;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.util.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.sql.Connection;
+ import java.sql.PreparedStatement;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ 
+ /**
+  * Extends the transaction handler with methods needed only by the compactor 
threads.  These
+  * methods are not available through the thrift interface.
+  */
+ class CompactionTxnHandler extends TxnHandler {
+   static final private String CLASS_NAME = 
CompactionTxnHandler.class.getName();
+   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ 
+   public CompactionTxnHandler() {
+   }
+ 
+   /**
+    * This will look through the completed_txn_components table and look for 
partitions or tables
+    * that may be ready for compaction.  Also, look through txns and 
txn_components tables for
+    * aborted transactions that we should add to the list.
+    * @param maxAborted Maximum number of aborted queries to allow before 
marking this as a
+    *                   potential compaction.
+    * @return list of CompactionInfo structs.  These will not have id, type,
+    * or runAs set since these are only potential compactions not actual ones.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException {
+     Connection dbConn = null;
+     Set<CompactionInfo> response = new HashSet<>();
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         // Check for completed transactions
+         String s = "select distinct ctc_database, ctc_table, " +
+           "ctc_partition from COMPLETED_TXN_COMPONENTS";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.dbname = rs.getString(1);
+           info.tableName = rs.getString(2);
+           info.partName = rs.getString(3);
+           response.add(info);
+         }
+         rs.close();
+ 
+         // Check for aborted txns
+         s = "select tc_database, tc_table, tc_partition " +
+           "from TXNS, TXN_COMPONENTS " +
+           "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
+           "group by tc_database, tc_table, tc_partition " +
+           "having count(*) > " + maxAborted;
+ 
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.dbname = rs.getString(1);
+           info.tableName = rs.getString(2);
+           info.partName = rs.getString(3);
+           info.tooManyAborts = true;
+           response.add(info);
+         }
+ 
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+       } catch (SQLException e) {
+         LOG.error("Unable to connect to transaction database " + 
e.getMessage());
+         checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + 
maxAborted + ")");
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+       return response;
+     }
+     catch (RetryException e) {
+       return findPotentialCompactions(maxAborted);
+     }
+   }
+ 
+   /**
+    * Sets the user to run as.  This is for the case
+    * where the request was generated by the user and so the worker must set 
this value later.
+    * @param cq_id id of this entry in the queue
+    * @param user user to run the jobs as
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void setRunAs(long cq_id, String user) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' 
where cq_id = " + cq_id;
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCnt = stmt.executeUpdate(s);
+         if (updCnt != 1) {
+           LOG.error("Unable to set cq_run_as=" + user + " for compaction 
record with cq_id=" + cq_id + ".  updCnt=" + updCnt);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to update compaction queue, " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user 
+")");
+       } finally {
+         closeDbConn(dbConn);
+         closeStmt(stmt);
+       }
+     } catch (RetryException e) {
+       setRunAs(cq_id, user);
+     }
+   }
+ 
+   /**
+    * This will grab the next compaction request off of
+    * the queue, and assign it to the worker.
+    * @param workerId id of the worker calling this, will be recorded in the db
+    * @return an info element for this compaction request, or null if there is 
no work to do now.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public CompactionInfo findNextToCompact(String workerId) throws 
MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       //need a separate stmt for executeUpdate() otherwise it will close the 
ResultSet(HIVE-12725)
+       Statement updStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+           "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = 
'" + INITIATED_STATE + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           LOG.debug("No compactions found ready to compact");
+           dbConn.rollback();
+           return null;
+         }
+         updStmt = dbConn.createStatement();
+         do {
+           CompactionInfo info = new CompactionInfo();
+           info.id = rs.getLong(1);
+           info.dbname = rs.getString(2);
+           info.tableName = rs.getString(3);
+           info.partName = rs.getString(4);
+           info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+           info.properties = rs.getString(6);
+           // Now, update this record as being worked on by this worker.
+           long now = getDbTime(dbConn);
+           s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', 
" +
+             "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where 
cq_id = " + info.id +
+             " AND cq_state='" + INITIATED_STATE + "'";
+           LOG.debug("Going to execute update <" + s + ">");
+           int updCount = updStmt.executeUpdate(s);
+           if(updCount == 1) {
+             dbConn.commit();
+             return info;
+           }
+           if(updCount == 0) {
+             LOG.debug("Another Worker picked up " + info);
+             continue;
+           }
+           LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for 
compaction record: " +
+             info + ". updCnt=" + updCount + ".");
+           dbConn.rollback();
+           return null;
+         } while( rs.next());
+         dbConn.rollback();
+         return null;
+       } catch (SQLException e) {
+         LOG.error("Unable to select next element for compaction, " + 
e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + 
")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(updStmt);
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return findNextToCompact(workerId);
+     }
+   }
+ 
+   /**
+    * This will mark an entry in the queue as compacted
+    * and put it in the ready to clean state.
+    * @param info info on the compaction entry to mark as compacted.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void markCompacted(CompactionInfo info) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_state = '" + 
READY_FOR_CLEANING + "', " +
+           "cq_worker_id = null where cq_id = " + info.id;
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCnt = stmt.executeUpdate(s);
+         if (updCnt != 1) {
+           LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for 
compaction record: " + info + ". updCnt=" + updCnt);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to update compaction queue " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "markCompacted(" + info + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       markCompacted(info);
+     }
+   }
+ 
+   /**
+    * Find entries in the queue that are ready to
+    * be cleaned.
+    * @return information on the entry in the queue.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public List<CompactionInfo> findReadyToClean() throws MetaException {
+     Connection dbConn = null;
+     List<CompactionInfo> rc = new ArrayList<>();
+ 
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select cq_id, cq_database, cq_table, cq_partition, "
+                 + "cq_type, cq_run_as, cq_highest_write_id from 
COMPACTION_QUEUE where cq_state = '"
+                 + READY_FOR_CLEANING + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.id = rs.getLong(1);
+           info.dbname = rs.getString(2);
+           info.tableName = rs.getString(3);
+           info.partName = rs.getString(4);
+           switch (rs.getString(5).charAt(0)) {
+             case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+             case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+             default: throw new MetaException("Unexpected compaction type " + 
rs.getString(5));
+           }
+           info.runAs = rs.getString(6);
+           info.highestWriteId = rs.getLong(7);
+           rc.add(info);
+         }
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+         return rc;
+       } catch (SQLException e) {
+         LOG.error("Unable to select next element for cleaning, " + 
e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findReadyToClean");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return findReadyToClean();
+     }
+   }
+ 
+   /**
+    * This will remove an entry from the queue after
+    * it has been compacted.
+    * 
+    * @param info info on the compaction entry to remove
+    */
+   @Override
+   @RetrySemantics.CannotRetry
+   public void markCleaned(CompactionInfo info) throws MetaException {
+     try {
+       Connection dbConn = null;
+       PreparedStatement pStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = ?");
+         pStmt.setLong(1, info.id);
+         rs = pStmt.executeQuery();
+         if(rs.next()) {
+           info = CompactionInfo.loadFullFromCompactionQueue(rs);
+         }
+         else {
+           throw new IllegalStateException("No record with CQ_ID=" + info.id + 
" found in COMPACTION_QUEUE");
+         }
+         close(rs);
+         String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+         pStmt = dbConn.prepareStatement(s);
+         pStmt.setLong(1, info.id);
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCount = pStmt.executeUpdate();
+         if (updCount != 1) {
+           LOG.error("Unable to delete compaction record: " + info +  ".  
Update count=" + updCount);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, 
?,?,?,?,?, ?,?,?,?)");
+         info.state = SUCCEEDED_STATE;
+         CompactionInfo.insertIntoCompletedCompactions(pStmt, info, 
getDbTime(dbConn));
+         updCount = pStmt.executeUpdate();
+ 
+         // Remove entries from completed_txn_components as well, so we don't 
start looking there
+         // again but only up to the highest write ID include in this 
compaction job.
+         //highestWriteId will be NULL in upgrade scenarios
+         s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and 
" +
+             "ctc_table = ?";
+         if (info.partName != null) {
+           s += " and ctc_partition = ?";
+         }
+         if(info.highestWriteId != 0) {
+           s += " and ctc_writeid <= ?";
+         }
+         pStmt = dbConn.prepareStatement(s);
+         int paramCount = 1;
+         pStmt.setString(paramCount++, info.dbname);
+         pStmt.setString(paramCount++, info.tableName);
+         if (info.partName != null) {
+           pStmt.setString(paramCount++, info.partName);
+         }
+         if(info.highestWriteId != 0) {
+           pStmt.setLong(paramCount++, info.highestWriteId);
+         }
+         LOG.debug("Going to execute update <" + s + ">");
+         if (pStmt.executeUpdate() < 1) {
+           LOG.error("Expected to remove at least one row from 
completed_txn_components when " +
+             "marking compaction entry as clean!");
+         }
+ 
+         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = 
tc_txnid and txn_state = '" +
+           TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
+         if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
+         if (info.partName != null) s += " and tc_partition = ?";
+ 
+         pStmt = dbConn.prepareStatement(s);
+         paramCount = 1;
+         pStmt.setString(paramCount++, info.dbname);
+         pStmt.setString(paramCount++, info.tableName);
+         if(info.highestWriteId != 0) {
+           pStmt.setLong(paramCount++, info.highestWriteId);
+         }
+         if (info.partName != null) {
+           pStmt.setString(paramCount++, info.partName);
+         }
+ 
+         LOG.debug("Going to execute update <" + s + ">");
+         rs = pStmt.executeQuery();
+         List<Long> txnids = new ArrayList<>();
+         List<String> questions = new ArrayList<>();
+         while (rs.next()) {
+           long id = rs.getLong(1);
+           txnids.add(id);
+           questions.add("?");
+         }
+         // Remove entries from txn_components, as there may be aborted txn 
components
+         if (txnids.size() > 0) {
+           List<String> queries = new ArrayList<>();
+ 
+           // Prepare prefix and suffix
+           StringBuilder prefix = new StringBuilder();
+           StringBuilder suffix = new StringBuilder();
+ 
+           prefix.append("delete from TXN_COMPONENTS where ");
+ 
+           //because 1 txn may include different partitions/tables even in 
auto commit mode
+           suffix.append(" and tc_database = ?");
+           suffix.append(" and tc_table = ?");
+           if (info.partName != null) {
+             suffix.append(" and tc_partition = ?");
+           }
+ 
+           // Populate the complete query with provided prefix and suffix
+           List<Integer> counts = TxnUtils
+               .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, 
questions, "tc_txnid",
+                   true, false);
+           int totalCount = 0;
+           for (int i = 0; i < queries.size(); i++) {
+             String query = queries.get(i);
+             int insertCount = counts.get(i);
+ 
+             LOG.debug("Going to execute update <" + query + ">");
+             pStmt = dbConn.prepareStatement(query);
+             for (int j = 0; j < insertCount; j++) {
+               pStmt.setLong(j + 1, txnids.get(totalCount + j));
+             }
+             totalCount += insertCount;
+             paramCount = insertCount + 1;
+             pStmt.setString(paramCount++, info.dbname);
+             pStmt.setString(paramCount++, info.tableName);
+             if (info.partName != null) {
+               pStmt.setString(paramCount++, info.partName);
+             }
+             int rc = pStmt.executeUpdate();
+             LOG.debug("Removed " + rc + " records from txn_components");
+ 
+             // Don't bother cleaning from the txns table.  A separate call 
will do that.  We don't
+             // know here which txns still have components from other tables 
or partitions in the
+             // table, so we don't know which ones we can and cannot clean.
+           }
+         }
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from compaction queue " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "markCleaned(" + info + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       markCleaned(info);
+     }
+   }
+ 
+   /**
+    * Clean up entries from TXN_TO_WRITE_ID table less than 
min_uncommited_txnid as found by
+    * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), 
min(Aborted TXNS.txn_id)).
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void cleanTxnToWriteIdTable() throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+ 
+       try {
+         // We query for minimum values in all the queries and they can only 
increase by any concurrent
+         // operations. So, READ COMMITTED is sufficient.
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         // First need to find the min_uncommitted_txnid which is currently 
seen by any open transactions.
+         // If there are no txns which are currently open or aborted in the 
system, then current value of
+         // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
+         String s = "select ntxn_next from NEXT_TXN_ID";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           throw new MetaException("Transaction tables not properly " +
+                   "initialized, no record found in next_txn_id");
+         }
+         long minUncommittedTxnId = rs.getLong(1);
+ 
+         // If there are any open txns, then the minimum of min_open_txnid 
from MIN_HISTORY_LEVEL table
+         // could be the min_uncommitted_txnid if lesser than 
NEXT_TXN_ID.ntxn_next.
+         s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (rs.next()) {
+           long minOpenTxnId = rs.getLong(1);
+           if (minOpenTxnId > 0) {
+             minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId);
+           }
+         }
+ 
+         // If there are aborted txns, then the minimum aborted txnid could be 
the min_uncommitted_txnid
+         // if lesser than both NEXT_TXN_ID.ntxn_next and 
min(MIN_HISTORY_LEVEL .mhl_min_open_txnid).
+         s = "select min(txn_id) from TXNS where txn_state = " + 
quoteChar(TXN_ABORTED);
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (rs.next()) {
+           long minAbortedTxnId = rs.getLong(1);
+           if (minAbortedTxnId > 0) {
+             minUncommittedTxnId = Math.min(minAbortedTxnId, 
minUncommittedTxnId);
+           }
+         }
+ 
+         // As all txns below min_uncommitted_txnid are either committed or 
empty_aborted, we are allowed
+         // to cleanup the entries less than min_uncommitted_txnid from the 
TXN_TO_WRITE_ID table.
+         s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + 
minUncommittedTxnId;
+         LOG.debug("Going to execute delete <" + s + ">");
+         int rc = stmt.executeUpdate(s);
+         LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn 
Low-Water-Mark: " + minUncommittedTxnId);
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from txns table " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "cleanTxnToWriteIdTable");
+         throw new MetaException("Unable to connect to transaction database " +
+                 StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       cleanTxnToWriteIdTable();
+     }
+   }
+ 
+   /**
+    * Clean up aborted transactions from txns that have no components in 
txn_components. The reason such
+    * txns exist can be that now work was done in this txn (e.g. Streaming 
opened TransactionBatch and
+    * abandoned it w/o doing any work) or due to {@link 
#markCleaned(CompactionInfo)} being called.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void cleanEmptyAbortedTxns() throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       try {
+         //Aborted is a terminal state, so nothing about the txn can change
+         //after that, so READ COMMITTED is sufficient.
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select txn_id from TXNS where " +
 -          "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
 -          "txn_state = '" + TXN_ABORTED + "'";
++            "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
++            "txn_state = '" + TXN_ABORTED + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         List<Long> txnids = new ArrayList<>();
+         while (rs.next()) txnids.add(rs.getLong(1));
+         close(rs);
+         if(txnids.size() <= 0) {
+           return;
+         }
+         Collections.sort(txnids);//easier to read logs
++
+         List<String> queries = new ArrayList<>();
+         StringBuilder prefix = new StringBuilder();
+         StringBuilder suffix = new StringBuilder();
+ 
++        // Turn off COLUMN_STATS_ACCURATE for txnids' components in TBLS and 
PARTITIONS
++        prefix.append("select tbl_id from TBLS inner join DBS on TBLS.DB_ID = 
DBS.DB_ID "
++            + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and 
t2w_table = TBLS.TBL_NAME"
++            + " and t2w_writeid = TBLS.WRITE_ID where ");
++        suffix.append("");
++        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "t2w_txnid", true, false);
++
++        // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from TABLE_PARAMS 
for the txnids.
++        List<StringBuilder> finalCommands = new ArrayList<>(queries.size());
++        for (int i = 0; i < queries.size(); i++) {
++          String query = queries.get(i);
++          finalCommands.add(i, new StringBuilder("delete from TABLE_PARAMS " +
++                  " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and 
tbl_id in ("));
++          finalCommands.get(i).append(query + ")");
++          LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++          int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++          LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS 
states from TBLS");
++        }
++
++        queries.clear();
++        prefix.setLength(0);
++        suffix.setLength(0);
++        finalCommands.clear();
++
++        // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from 
PARTITIONS_PARAMS for the txnids.
++        prefix.append("select part_id from PARTITIONS "
++            + "inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
++            + "inner join DBS on TBLS.DB_ID = DBS.DB_ID "
++            + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and 
t2w_table = TBLS.TBL_NAME"
++            + " and t2w_writeid = TBLS.WRITE_ID where ");
++        suffix.append("");
++        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "t2w_txnid", true, false);
++
++        for (int i = 0; i < queries.size(); i++) {
++          String query = queries.get(i);
++          finalCommands.add(i, new StringBuilder("delete from 
PARTITION_PARAMS " +
++                  " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and 
part_id in ("));
++          finalCommands.get(i).append(query + ")");
++          LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++          int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++          LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS 
states from PARTITIONS");
++        }
++
++        queries.clear();
++        prefix.setLength(0);
++        suffix.setLength(0);
++        finalCommands.clear();
++
++        // Delete from TXNS.
+         prefix.append("delete from TXNS where ");
+         suffix.append("");
+ 
+         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "txn_id", false, false);
+ 
+         for (String query : queries) {
+           LOG.debug("Going to execute update <" + query + ">");
+           int rc = stmt.executeUpdate(query);
+           LOG.info("Removed " + rc + "  empty Aborted transactions from 
TXNS");
+         }
+         LOG.info("Aborted transactions removed from TXNS: " + txnids);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from txns table " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       cleanEmptyAbortedTxns();
+     }
+   }
+ 
+   /**
+    * This will take all entries assigned to workers
+    * on a host return them to INITIATED state.  The initiator should use this 
at start up to
+    * clean entries from any workers that were in the middle of compacting 
when the metastore
+    * shutdown.  It does not reset entries from worker threads on other hosts 
as those may still
+    * be working.
+    * @param hostname Name of this host.  It is assumed this prefixes the 
thread's worker id,
+    *                 so that like hostname% will match the worker id.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void revokeFromLocalWorkers(String hostname) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
+           + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_worker_id like '"
+           +  hostname + "%'";
+         LOG.debug("Going to execute update <" + s + ">");
+         // It isn't an error if the following returns no rows, as the local 
workers could have died
+         // with  nothing assigned to them.
+         stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to change dead worker's records back to initiated 
state " +
+           e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + 
hostname +")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       revokeFromLocalWorkers(hostname);
+     }
+   }
+ 
+   /**
+    * This call will return all compaction queue
+    * entries assigned to a worker but over the timeout back to the initiated 
state.
+    * This should be called by the initiator on start up and occasionally when 
running to clean up
+    * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} 
should be called
+    * first.
+    * @param timeout number of milliseconds since start time that should 
elapse before a worker is
+    *                declared dead.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void revokeTimedoutWorkers(long timeout) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         long latestValidStart = getDbTime(dbConn) - timeout;
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start 
= null, cq_state = '"
+           + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and 
cq_start < "
+           +  latestValidStart;
+         LOG.debug("Going to execute update <" + s + ">");
+         // It isn't an error if the following returns no rows, as the local 
workers could have died
+         // with  nothing assigned to them.
+         stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to change dead worker's records back to initiated 
state " +
+           e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout 
+ ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       revokeTimedoutWorkers(timeout);
+     }
+   }
+ 
+   /**
+    * Queries metastore DB directly to find columns in the table which have 
statistics information.
+    * If {@code ci} includes partition info then per partition stats info is 
examined, otherwise
+    * table level stats are examined.
+    * @throws MetaException
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public List<String> findColumnsWithStats(CompactionInfo ci) throws 
MetaException {
+     Connection dbConn = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         String quote = getIdentifierQuoteString(dbConn);
+         StringBuilder bldr = new StringBuilder();
+         bldr.append("SELECT 
").append(quote).append("COLUMN_NAME").append(quote)
+           .append(" FROM ")
+           .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : 
"PART_COL_STATS"))
+           .append(quote)
+           .append(" WHERE ")
+           .append(quote).append("DB_NAME").append(quote).append(" = ?")
+           .append(" AND ").append(quote).append("TABLE_NAME").append(quote)
+           .append(" = ?");
+         if (ci.partName != null) {
+           bldr.append(" AND 
").append(quote).append("PARTITION_NAME").append(quote).append(" = ?");
+         }
+         String s = bldr.toString();
+         pStmt = dbConn.prepareStatement(s);
+         pStmt.setString(1, ci.dbname);
+         pStmt.setString(2, ci.tableName);
+         if (ci.partName != null) {
+           pStmt.setString(3, ci.partName);
+         }
+ 
+       /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? 
"TAB_COL_STATS" :
+           "PART_COL_STATS")
+          + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + 
ci.tableName + "'"
+         + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + 
"'");*/
+         LOG.debug("Going to execute <" + s + ">");
+         rs = pStmt.executeQuery();
+         List<String> columns = new ArrayList<>();
+         while (rs.next()) {
+           columns.add(rs.getString(1));
+         }
+         LOG.debug("Found columns to update stats: " + columns + " on " + 
ci.tableName +
+           (ci.partName == null ? "" : "/" + ci.partName));
+         dbConn.commit();
+         return columns;
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName +
+           (ci.partName == null ? "" : "/" + ci.partName) + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException ex) {
+       return findColumnsWithStats(ci);
+     }
+   }
+ 
+   /**
+    * Record the highest txn id that the {@code ci} compaction job will pay 
attention to.
+    * This is the highest resolved txn id, i.e. such that there are no open 
txns with lower ids.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void setCompactionHighestWriteId(CompactionInfo ci, long 
highestWriteId) throws MetaException {
+     Connection dbConn = null;
+     Statement stmt = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET 
CQ_HIGHEST_WRITE_ID = " + highestWriteId +
+           " WHERE CQ_ID = " + ci.id);
+         if(updCount != 1) {
+           throw new IllegalStateException("Could not find record in 
COMPACTION_QUEUE for " + ci);
+         }
+         dbConn.commit();
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + 
highestWriteId + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+       }
+     } catch (RetryException ex) {
+       setCompactionHighestWriteId(ci, highestWriteId);
+     }
+   }
+   private static class RetentionCounters {
+     int attemptedRetention = 0;
+     int failedRetention = 0;
+     int succeededRetention = 0;
+     RetentionCounters(int attemptedRetention, int failedRetention, int 
succeededRetention) {
+       this.attemptedRetention = attemptedRetention;
+       this.failedRetention = failedRetention;
+       this.succeededRetention = succeededRetention;
+     }
+   }
+   private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, 
RetentionCounters rc) {
+     switch (ci.state) {
+       case ATTEMPTED_STATE:
+         if(--rc.attemptedRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       case FAILED_STATE:
+         if(--rc.failedRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       case SUCCEEDED_STATE:
+         if(--rc.succeededRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       default:
+         //do nothing to hanlde future RU/D where we may want to add new state 
types
+     }
+   }
+ 
+   /**
+    * For any given compactable entity (partition; table if not partitioned) 
the history of compactions
+    * may look like "sssfffaaasffss", for example.  The idea is to retain the 
tail (most recent) of the
+    * history such that a configurable number of each type of state is 
present.  Any other entries
+    * can be purged.  This scheme has advantage of always retaining the last 
failure/success even if
+    * it's not recent.
+    * @throws MetaException
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void purgeCompactionHistory() throws MetaException {
+     Connection dbConn = null;
+     Statement stmt = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     List<Long> deleteSet = new ArrayList<>();
+     RetentionCounters rc = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         /*cc_id is monotonically increasing so for any entity sorts in order 
of compaction history,
+         thus this query groups by entity and withing group sorts most recent 
first*/
+         rs = stmt.executeQuery("select cc_id, cc_database, cc_table, 
cc_partition, cc_state from " +
+           "COMPLETED_COMPACTIONS order by cc_database, cc_table, 
cc_partition, cc_id desc");
+         String lastCompactedEntity = null;
+         /*In each group, walk from most recent and count occurences of each 
state type.  Once you
+         * have counted enough (for each state) to satisfy retention policy, 
delete all other
+         * instances of this status.*/
+         while(rs.next()) {
+           CompactionInfo ci = new CompactionInfo(rs.getLong(1), 
rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+           if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+             lastCompactedEntity = ci.getFullPartitionName();
+             rc = new RetentionCounters(MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+               getFailedCompactionRetention(),
+               MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+           }
+           checkForDeletion(deleteSet, ci, rc);
+         }
+         close(rs);
+ 
+         if (deleteSet.size() <= 0) {
+           return;
+         }
+ 
+         List<String> queries = new ArrayList<>();
+ 
+         StringBuilder prefix = new StringBuilder();
+         StringBuilder suffix = new StringBuilder();
+ 
+         prefix.append("delete from COMPLETED_COMPACTIONS where ");
+         suffix.append("");
+ 
+         List<String> questions = new ArrayList<>(deleteSet.size());
+         for (int  i = 0; i < deleteSet.size(); i++) {
+           questions.add("?");
+         }
+         List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, 
queries, prefix, suffix, questions, "cc_id", false, false);
+         int totalCount = 0;
+         for (int i = 0; i < queries.size(); i++) {
+           String query = queries.get(i);
+           long insertCount = counts.get(i);
+           LOG.debug("Going to execute update <" + query + ">");
+           pStmt = dbConn.prepareStatement(query);
+           for (int j = 0; j < insertCount; j++) {
+             pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
+           }
+           totalCount += insertCount;
+           int count = pStmt.executeUpdate();
+           LOG.debug("Removed " + count + " records from 
COMPLETED_COMPACTIONS");
+         }
+         dbConn.commit();
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "purgeCompactionHistory()");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+         closeStmt(pStmt);
+       }
+     } catch (RetryException ex) {
+       purgeCompactionHistory();
+     }
+   }
+   /**
+    * this ensures that the number of failed compaction entries retained is > 
than number of failed
+    * compaction threshold which prevents new compactions from being scheduled.
+    */
+   private int getFailedCompactionRetention() {
+     int failedThreshold = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+     int failedRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+     if(failedRetention < failedThreshold) {
+       LOG.warn("Invalid configuration " + 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+         "=" + failedRetention + " < " + 
ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+         failedRetention + ".  Will use " + 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+         "=" + failedRetention);
+       failedRetention = failedThreshold;
+     }
+     return failedRetention;
+   }
+   /**
+    * Returns {@code true} if there already exists sufficient number of 
consecutive failures for
+    * this table/partition so that no new automatic compactions will be 
scheduled.
+    * User initiated compactions don't do this check.
+    *
+    * Do we allow compacting whole table (when it's partitioned)?  No, though 
perhaps we should.
+    * That would be a meta operations, i.e. first find all partitions for this 
table (which have 
+    * txn info) and schedule each compaction separately.  This avoids 
complications in this logic.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public boolean checkFailedCompactions(CompactionInfo ci) throws 
MetaException {
+     Connection dbConn = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         pStmt = dbConn.prepareStatement("select CC_STATE from 
COMPLETED_COMPACTIONS where " +
+           "CC_DATABASE = ? and " +
+           "CC_TABLE = ? " +
+           (ci.partName != null ? "and CC_PARTITION = ?" : "") +
+           " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID 
desc");
+         pStmt.setString(1, ci.dbname);
+         pStmt.setString(2, ci.tableName);
+         if (ci.partName != null) {
+           pStmt.setString(3, ci.partName);
+         }
+         rs = pStmt.executeQuery();
+         int numFailed = 0;
+         int numTotal = 0;
+         int failedThreshold = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+         while(rs.next() && ++numTotal <= failedThreshold) {
+           if(rs.getString(1).charAt(0) == FAILED_STATE) {
+             numFailed++;
+           }
+           else {
+             numFailed--;
+           }
+         }
+         return numFailed == failedThreshold;
+       }
+       catch (SQLException e) {
+         LOG.error("Unable to check for failed compactions " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+         LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(e));
+         return false;//weren't able to check
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return checkFailedCompactions(ci);
+     }
+   }
+   /**
+    * If there is an entry in compaction_queue with ci.id, remove it
+    * Make entry in completed_compactions with status 'f'.
+    * If there is no entry in compaction_queue, it means Initiator failed to 
even schedule a compaction,
+    * which we record as ATTEMPTED_STATE entry in history.
+    */
+   @Override
+   @RetrySemantics.CannotRetry
+   public void markFailed(CompactionInfo ci) throws MetaException {//todo: 
this should not throw
+     //todo: this should take "comment" as parameter to set in CC_META_INFO to 
provide some context for the failure
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       PreparedStatement pStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = ?");
+         pStmt.setLong(1, ci.id);
+         rs = pStmt.executeQuery();
+         if(rs.next()) {
+           ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+           String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+           pStmt = dbConn.prepareStatement(s);
+           pStmt.setLong(1, ci.id);
+           LOG.debug("Going to execute update <" + s + ">");
+           int updCnt = pStmt.executeUpdate();
+         }
+         else {
+           if(ci.id > 0) {
+             //the record with valid CQ_ID has disappeared - this is a sign of 
something wrong
+             throw new IllegalStateException("No record with CQ_ID=" + ci.id + 
" found in COMPACTION_QUEUE");
+           }
+         }
+         if(ci.id == 0) {
+           //The failure occurred before we even made an entry in 
COMPACTION_QUEUE
+           //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+           ci.id = generateCompactionQueueId(stmt);
+           //mostly this indicates that the Initiator is paying attention to 
some table even though
+           //compactions are not happening.
+           ci.state = ATTEMPTED_STATE;
+           //this is not strictly accurate, but 'type' cannot be null.
+           if(ci.type == null) { ci.type = CompactionType.MINOR; }
+           ci.start = getDbTime(dbConn);
+         }
+         else {
+           ci.state = FAILED_STATE;
+         }
+         close(rs, stmt, null);
+         closeStmt(pStmt);
+ 
+         pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, 
?,?,?,?,?, ?,?,?,?)");
+         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
+         int updCount = pStmt.executeUpdate();
+         LOG.debug("Going to commit");
+         closeStmt(pStmt);
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         try {
+           checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+         }
+         catch(MetaException ex) {
+           LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(ex));
+         }
+         LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
+       } finally {
+         close(rs, stmt, null);
+         close(null, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       markFailed(ci);
+     }
+   }
+   @Override
+   @RetrySemantics.Idempotent
+   public void setHadoopJobId(String hadoopJobId, long id) {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + 
quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+         LOG.debug("Going to execute <" + s + ">");
+         int updateCount = stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         closeStmt(stmt);
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + 
e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         try {
+           checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + 
id + ")");
+         }
+         catch(MetaException ex) {
+           LOG.error("Unable to connect to transaction database " + 
StringUtils.stringifyException(ex));
+         }
+         LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + 
e.getMessage(), e);
+       } finally {
+         close(null, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       setHadoopJobId(hadoopJobId, id);
+     }
+   }
+ }
+ 
+ 

http://git-wip-us.apache.org/repos/asf/hive/blob/93b9cdd6/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --cc 
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 0000000,f8c2ca2..319e612
mode 000000,100644..100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@@ -1,0 -1,505 +1,599 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
+ import java.sql.Connection;
+ import java.sql.Driver;
+ import java.sql.PreparedStatement;
+ import java.sql.ResultSet;
+ import java.sql.ResultSetMetaData;
+ import java.sql.SQLException;
+ import java.sql.SQLTransactionRollbackException;
+ import java.sql.Statement;
+ import java.util.Properties;
+ 
+ import com.google.common.annotations.VisibleForTesting;
++import jline.internal.Log;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
++import org.apache.zookeeper.txn.TxnHeader;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Utility methods for creating and destroying txn database/schema, plus 
methods for
+  * querying against metastore tables.
+  * Placed here in a separate class so it can be shared across unit tests.
+  */
+ public final class TxnDbUtil {
+ 
+   static final private Logger LOG = 
LoggerFactory.getLogger(TxnDbUtil.class.getName());
+   private static final String TXN_MANAGER = 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+ 
+   private static int deadlockCnt = 0;
+ 
+   private TxnDbUtil() {
+     throw new UnsupportedOperationException("Can't initialize class");
+   }
+ 
+   /**
+    * Set up the configuration so it will use the DbTxnManager, concurrency 
will be set to true,
+    * and the JDBC configs will be set for putting the transaction and lock 
info in the embedded
+    * metastore.
+    *
+    * @param conf HiveConf to add these values to
+    */
+   public static void setConfValues(Configuration conf) {
+     MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
+     MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+   }
+ 
+   public static void prepDb(Configuration conf) throws Exception {
+     // This is a bogus hack because it copies the contents of the SQL file
+     // intended for creating derby databases, and thus will inexorably get
+     // out of date with it.  I'm open to any suggestions on how to make this
+     // read the file in a build friendly way.
+ 
+     Connection conn = null;
+     Statement stmt = null;
+     try {
+       conn = getConnection(conf);
+       stmt = conn.createStatement();
+       stmt.execute("CREATE TABLE TXNS (" +
+           "  TXN_ID bigint PRIMARY KEY," +
+           "  TXN_STATE char(1) NOT NULL," +
+           "  TXN_STARTED bigint NOT NULL," +
+           "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
+           "  TXN_USER varchar(128) NOT NULL," +
+           "  TXN_HOST varchar(128) NOT NULL," +
+           "  TXN_TYPE integer)");
+ 
+       stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
+           "  TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID)," +
+           "  TC_DATABASE varchar(128) NOT NULL," +
+           "  TC_TABLE varchar(128)," +
+           "  TC_PARTITION varchar(767)," +
+           "  TC_OPERATION_TYPE char(1) NOT NULL," +
+           "  TC_WRITEID bigint)");
+       stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+           "  CTC_TXNID bigint NOT NULL," +
+           "  CTC_DATABASE varchar(128) NOT NULL," +
+           "  CTC_TABLE varchar(128)," +
+           "  CTC_PARTITION varchar(767)," +
+           "  CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, 
INCREMENT BY 1) NOT NULL," +
+           "  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," +
+           "  CTC_WRITEID bigint)");
+       stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT 
NULL)");
+       stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+ 
+       stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" +
+           " T2W_TXNID bigint NOT NULL," +
+           " T2W_DATABASE varchar(128) NOT NULL," +
+           " T2W_TABLE varchar(256) NOT NULL," +
+           " T2W_WRITEID bigint NOT NULL)");
+       stmt.execute("CREATE TABLE NEXT_WRITE_ID (" +
+           " NWI_DATABASE varchar(128) NOT NULL," +
+           " NWI_TABLE varchar(256) NOT NULL," +
+           " NWI_NEXT bigint NOT NULL)");
+ 
+       stmt.execute("CREATE TABLE MIN_HISTORY_LEVEL (" +
+           " MHL_TXNID bigint NOT NULL," +
+           " MHL_MIN_OPEN_TXNID bigint NOT NULL," +
+           " PRIMARY KEY(MHL_TXNID))");
+ 
+       stmt.execute("CREATE TABLE HIVE_LOCKS (" +
+           " HL_LOCK_EXT_ID bigint NOT NULL," +
+           " HL_LOCK_INT_ID bigint NOT NULL," +
+           " HL_TXNID bigint NOT NULL," +
+           " HL_DB varchar(128) NOT NULL," +
+           " HL_TABLE varchar(128)," +
+           " HL_PARTITION varchar(767)," +
+           " HL_LOCK_STATE char(1) NOT NULL," +
+           " HL_LOCK_TYPE char(1) NOT NULL," +
+           " HL_LAST_HEARTBEAT bigint NOT NULL," +
+           " HL_ACQUIRED_AT bigint," +
+           " HL_USER varchar(128) NOT NULL," +
+           " HL_HOST varchar(128) NOT NULL," +
+           " HL_HEARTBEAT_COUNT integer," +
+           " HL_AGENT_INFO varchar(128)," +
+           " HL_BLOCKEDBY_EXT_ID bigint," +
+           " HL_BLOCKEDBY_INT_ID bigint," +
+         " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+       stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+ 
+       stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT 
NULL)");
+       stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+ 
+       stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
+           " CQ_ID bigint PRIMARY KEY," +
+           " CQ_DATABASE varchar(128) NOT NULL," +
+           " CQ_TABLE varchar(128) NOT NULL," +
+           " CQ_PARTITION varchar(767)," +
+           " CQ_STATE char(1) NOT NULL," +
+           " CQ_TYPE char(1) NOT NULL," +
+           " CQ_TBLPROPERTIES varchar(2048)," +
+           " CQ_WORKER_ID varchar(128)," +
+           " CQ_START bigint," +
+           " CQ_RUN_AS varchar(128)," +
+           " CQ_HIGHEST_WRITE_ID bigint," +
+           " CQ_META_INFO varchar(2048) for bit data," +
+           " CQ_HADOOP_JOB_ID varchar(32))");
+ 
+       stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint 
NOT NULL)");
+       stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+ 
+       stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
+           " CC_ID bigint PRIMARY KEY," +
+           " CC_DATABASE varchar(128) NOT NULL," +
+           " CC_TABLE varchar(128) NOT NULL," +
+           " CC_PARTITION varchar(767)," +
+           " CC_STATE char(1) NOT NULL," +
+           " CC_TYPE char(1) NOT NULL," +
+           " CC_TBLPROPERTIES varchar(2048)," +
+           " CC_WORKER_ID varchar(128)," +
+           " CC_START bigint," +
+           " CC_END bigint," +
+           " CC_RUN_AS varchar(128)," +
+           " CC_HIGHEST_WRITE_ID bigint," +
+           " CC_META_INFO varchar(2048) for bit data," +
+           " CC_HADOOP_JOB_ID varchar(32))");
+ 
+       stmt.execute("CREATE TABLE AUX_TABLE (" +
+         " MT_KEY1 varchar(128) NOT NULL," +
+         " MT_KEY2 bigint NOT NULL," +
+         " MT_COMMENT varchar(255)," +
+         " PRIMARY KEY(MT_KEY1, MT_KEY2))");
+ 
+       stmt.execute("CREATE TABLE WRITE_SET (" +
+         " WS_DATABASE varchar(128) NOT NULL," +
+         " WS_TABLE varchar(128) NOT NULL," +
+         " WS_PARTITION varchar(767)," +
+         " WS_TXNID bigint NOT NULL," +
+         " WS_COMMIT_ID bigint NOT NULL," +
+         " WS_OPERATION_TYPE char(1) NOT NULL)"
+       );
+ 
+       stmt.execute("CREATE TABLE REPL_TXN_MAP (" +
+           " RTM_REPL_POLICY varchar(256) NOT NULL, " +
+           " RTM_SRC_TXN_ID bigint NOT NULL, " +
+           " RTM_TARGET_TXN_ID bigint NOT NULL, " +
+           " PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))"
+       );
+ 
+       try {
++        stmt.execute("CREATE TABLE \"APP\".\"TBLS\" (\"TBL_ID\" BIGINT NOT 
NULL, " +
++            " \"CREATE_TIME\" INTEGER NOT NULL, \"DB_ID\" BIGINT, 
\"LAST_ACCESS_TIME\" INTEGER NOT NULL, " +
++            " \"OWNER\" VARCHAR(767), \"OWNER_TYPE\" VARCHAR(10), 
\"RETENTION\" INTEGER NOT NULL, " +
++            " \"SD_ID\" BIGINT, \"TBL_NAME\" VARCHAR(256), \"TBL_TYPE\" 
VARCHAR(128), " +
++            " \"VIEW_EXPANDED_TEXT\" LONG VARCHAR, \"VIEW_ORIGINAL_TEXT\" 
LONG VARCHAR, " +
++            " \"IS_REWRITE_ENABLED\" CHAR(1) NOT NULL DEFAULT \'N\', " +
++            " \"WRITE_ID\" BIGINT DEFAULT 0, " +
++            " PRIMARY KEY (TBL_ID))"
++        );
++      } catch (SQLException e) {
++        if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
++          LOG.info("TBLS table already exist, ignoring");
++        } else {
++          throw e;
++        }
++      }
++
++      try {
++        stmt.execute("CREATE TABLE \"APP\".\"PARTITIONS\" (" +
++            " \"PART_ID\" BIGINT NOT NULL, \"CREATE_TIME\" INTEGER NOT NULL, 
" +
++            " \"LAST_ACCESS_TIME\" INTEGER NOT NULL, \"PART_NAME\" 
VARCHAR(767), " +
++            " \"SD_ID\" BIGINT, \"TBL_ID\" BIGINT, " +
++            " \"WRITE_ID\" BIGINT DEFAULT 0, " +
++            " PRIMARY KEY (PART_ID))"
++        );
++      } catch (SQLException e) {
++        if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
++          LOG.info("PARTITIONS table already exist, ignoring");
++        } else {
++          throw e;
++        }
++      }
++
++      try {
++        stmt.execute("CREATE TABLE \"APP\".\"TABLE_PARAMS\" (" +
++            " \"TBL_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT 
NULL, " +
++            " \"PARAM_VALUE\" CLOB, " +
++            " PRIMARY KEY (TBL_ID, PARAM_KEY))"
++        );
++      } catch (SQLException e) {
++        if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
++          LOG.info("TABLE_PARAMS table already exist, ignoring");
++        } else {
++          throw e;
++        }
++      }
++
++      try {
++        stmt.execute("CREATE TABLE \"APP\".\"PARTITION_PARAMS\" (" +
++            " \"PART_ID\" BIGINT NOT NULL, \"PARAM_KEY\" VARCHAR(256) NOT 
NULL, " +
++            " \"PARAM_VALUE\" VARCHAR(4000), " +
++            " PRIMARY KEY (PART_ID, PARAM_KEY))"
++        );
++      } catch (SQLException e) {
++        if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
++          LOG.info("PARTITION_PARAMS table already exist, ignoring");
++        } else {
++          throw e;
++        }
++      }
++
++      try {
+         stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" 
(\"SEQUENCE_NAME\" VARCHAR(256) NOT " +
+ 
+                 "NULL, \"NEXT_VAL\" BIGINT NOT NULL)"
+         );
+       } catch (SQLException e) {
+         if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
+           LOG.info("SEQUENCE_TABLE table already exist, ignoring");
+         } else {
+           throw e;
+         }
+       }
+ 
+       try {
+         stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_SEQUENCE\" 
(\"NNI_ID\" BIGINT NOT NULL, " +
+ 
+                 "\"NEXT_EVENT_ID\" BIGINT NOT NULL)"
+         );
+       } catch (SQLException e) {
+         if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
+           LOG.info("NOTIFICATION_SEQUENCE table already exist, ignoring");
+         } else {
+           throw e;
+         }
+       }
+ 
+       try {
+         stmt.execute("CREATE TABLE \"APP\".\"NOTIFICATION_LOG\" (\"NL_ID\" 
BIGINT NOT NULL, " +
+                 "\"DB_NAME\" VARCHAR(128), \"EVENT_ID\" BIGINT NOT NULL, 
\"EVENT_TIME\" INTEGER NOT" +
+ 
+                 " NULL, \"EVENT_TYPE\" VARCHAR(32) NOT NULL, \"MESSAGE\" 
CLOB, \"TBL_NAME\" " +
+                 "VARCHAR" +
+                 "(256), \"MESSAGE_FORMAT\" VARCHAR(16))"
+         );
+       } catch (SQLException e) {
+         if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
+           LOG.info("NOTIFICATION_LOG table already exist, ignoring");
+         } else {
+           throw e;
+         }
+       }
+ 
+       stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" 
(\"SEQUENCE_NAME\", \"NEXT_VAL\") " +
+               "SELECT * FROM (VALUES 
('org.apache.hadoop.hive.metastore.model.MNotificationLog', " +
+               "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM 
\"APP\"" +
+               ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 
'org.apache.hadoop.hive.metastore" +
+               ".model.MNotificationLog')");
+ 
+       stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" 
(\"NNI_ID\", \"NEXT_EVENT_ID\")" +
+               " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( 
SELECT " +
+               "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")");
+ 
+       try {
+         stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" +
+                 "WNL_ID bigint NOT NULL," +
+                 "WNL_TXNID bigint NOT NULL," +
+                 "WNL_WRITEID bigint NOT NULL," +
+                 "WNL_DATABASE varchar(128) NOT NULL," +
+                 "WNL_TABLE varchar(128) NOT NULL," +
+                 "WNL_PARTITION varchar(1024) NOT NULL," +
+                 "WNL_TABLE_OBJ clob NOT NULL," +
+                 "WNL_PARTITION_OBJ clob," +
+                 "WNL_FILES clob," +
+                 "WNL_EVENT_TIME integer NOT NULL," +
+                 "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, 
WNL_PARTITION))"
+         );
+       } catch (SQLException e) {
+         if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
+           LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, 
ignoring");
+         } else {
+           throw e;
+         }
+       }
+ 
+       stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" 
(\"SEQUENCE_NAME\", \"NEXT_VAL\") " +
+               "SELECT * FROM (VALUES 
('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " +
+               "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM 
\"APP\"" +
+               ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 
'org.apache.hadoop.hive.metastore" +
+               ".model.MTxnWriteNotificationLog')");
+     } catch (SQLException e) {
+       try {
+         conn.rollback();
+       } catch (SQLException re) {
+         LOG.error("Error rolling back: " + re.getMessage());
+       }
+ 
+       // Another thread might have already created these tables.
+       if (e.getMessage() != null && e.getMessage().contains("already 
exists")) {
+         LOG.info("Txn tables already exist, returning");
+         return;
+       }
+ 
+       // This might be a deadlock, if so, let's retry
+       if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
+         LOG.warn("Caught deadlock, retrying db creation");
+         prepDb(conf);
+       } else {
+         throw e;
+       }
+     } finally {
+       deadlockCnt = 0;
+       closeResources(conn, stmt, null);
+     }
+   }
+ 
+   public static void cleanDb(Configuration conf) throws Exception {
+     int retryCount = 0;
+     while(++retryCount <= 3) {
+       boolean success = true;
+       Connection conn = null;
+       Statement stmt = null;
+       try {
+         conn = getConnection(conf);
+         stmt = conn.createStatement();
+ 
+         // We want to try these, whether they succeed or fail.
+         try {
+           stmt.execute("DROP INDEX HL_TXNID_INDEX");
+         } catch (SQLException e) {
+           if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) 
{
+             //42X65/3000 means index doesn't exist
+             LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() 
+
+               "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " 
retryCount=" + retryCount);
+             success = false;
+           }
+         }
+ 
+         success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
+         success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
+         success &= dropTable(stmt, "TXNS", retryCount);
+         success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+         success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount);
+         success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount);
+         success &= dropTable(stmt, "MIN_HISTORY_LEVEL", retryCount);
+         success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
+         success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
+         success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
+         success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
+         success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
+         success &= dropTable(stmt, "AUX_TABLE", retryCount);
+         success &= dropTable(stmt, "WRITE_SET", retryCount);
+         success &= dropTable(stmt, "REPL_TXN_MAP", retryCount);
+         /*
+          * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and 
NOTIFICATION_SEQUENCE as its used by other
+          * table which are not txn related to generate primary key. So if 
these tables are dropped
+          *  and other tables are not dropped, then it will create key 
duplicate error while inserting
+          *  to other table.
+          */
+       } finally {
+         closeResources(conn, stmt, null);
+       }
+       if(success) {
+         return;
+       }
+     }
+     throw new RuntimeException("Failed to clean up txn tables");
+   }
+ 
+   private static boolean dropTable(Statement stmt, String name, int 
retryCount) throws SQLException {
+     for (int i = 0; i < 3; i++) {
+       try {
+         stmt.execute("DROP TABLE " + name);
+         LOG.debug("Successfully dropped table " + name);
+         return true;
+       } catch (SQLException e) {
+         if ("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+           LOG.debug("Not dropping " + name + " because it doesn't exist");
+           //failed because object doesn't exist
+           return true;
+         }
+         if ("X0Y25".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+           // Intermittent failure
+           LOG.warn("Intermittent drop failure, retrying, try number " + i);
+           continue;
+         }
+         LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
+             " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " 
retryCount=" + retryCount);
+       }
+     }
+     LOG.error("Failed to drop table, don't know why");
+     return false;
+   }
+ 
+   /**
+    * A tool to count the number of partitions, tables,
+    * and databases locked by a particular lockId.
+    *
+    * @param lockId lock id to look for lock components
+    *
+    * @return number of components, or 0 if there is no lock
+    */
+   public static int countLockComponents(Configuration conf, long lockId) 
throws Exception {
+     Connection conn = null;
+     PreparedStatement stmt = null;
+     ResultSet rs = null;
+     try {
+       conn = getConnection(conf);
+       stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE 
hl_lock_ext_id = ?");
+       stmt.setLong(1, lockId);
+       rs = stmt.executeQuery();
+       if (!rs.next()) {
+         return 0;
+       }
+       return rs.getInt(1);
+     } finally {
+       closeResources(conn, stmt, rs);
+     }
+   }
+ 
+   /**
++   * Return true if the transaction of the given txnId is open.
++   * @param conf    HiveConf
++   * @param txnId   transaction id to search for
++   * @return
++   * @throws Exception
++   */
++  public static boolean isOpenOrAbortedTransaction(Configuration conf, long 
txnId) throws Exception {
++    Connection conn = null;
++    PreparedStatement stmt = null;
++    ResultSet rs = null;
++    try {
++      conn = getConnection(conf);
++      conn.setAutoCommit(false);
++      conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
++
++      stmt = conn.prepareStatement("SELECT txn_id FROM TXNS WHERE txn_id = 
?");
++      stmt.setLong(1, txnId);
++      rs = stmt.executeQuery();
++      if (!rs.next()) {
++        return false;
++      } else {
++        return true;
++      }
++    } finally {
++      closeResources(conn, stmt, rs);
++    }
++  }
++
++  /**
+    * Utility method used to run COUNT queries like "select count(*) from ..." 
against metastore tables
+    * @param countQuery countQuery text
+    * @return count countQuery result
+    * @throws Exception
+    */
+   public static int countQueryAgent(Configuration conf, String countQuery) 
throws Exception {
+     Connection conn = null;
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       conn = getConnection(conf);
+       stmt = conn.createStatement();
+       rs = stmt.executeQuery(countQuery);
+       if (!rs.next()) {
+         return 0;
+       }
+       return rs.getInt(1);
+     } finally {
+       closeResources(conn, stmt, rs);
+     }
+   }
+   @VisibleForTesting
+   public static String queryToString(Configuration conf, String query) throws 
Exception {
+     return queryToString(conf, query, true);
+   }
+   public static String queryToString(Configuration conf, String query, 
boolean includeHeader)
+       throws Exception {
+     Connection conn = null;
+     Statement stmt = null;
+     ResultSet rs = null;
+     StringBuilder sb = new StringBuilder();
+     try {
+       conn = getConnection(conf);
+       stmt = conn.createStatement();
+       rs = stmt.executeQuery(query);
+       ResultSetMetaData rsmd = rs.getMetaData();
+       if(includeHeader) {
+         for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+           sb.append(rsmd.getColumnName(colPos)).append("   ");
+         }
+         sb.append('\n');
+       }
+       while(rs.next()) {
+         for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+           sb.append(rs.getObject(colPos)).append("   ");
+         }
+         sb.append('\n');
+       }
+     } finally {
+       closeResources(conn, stmt, rs);
+     }
+     return sb.toString();
+   }
+ 
+   static Connection getConnection(Configuration conf) throws Exception {
+     String jdbcDriver = MetastoreConf.getVar(conf, 
ConfVars.CONNECTION_DRIVER);
+     Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
+     Properties prop = new Properties();
+     String driverUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
+     String user = MetastoreConf.getVar(conf, ConfVars.CONNECTION_USER_NAME);
+     String passwd = MetastoreConf.getPassword(conf, 
MetastoreConf.ConfVars.PWD);
+     prop.setProperty("user", user);
+     prop.setProperty("password", passwd);
+     Connection conn = driver.connect(driverUrl, prop);
+     conn.setAutoCommit(true);
+     return conn;
+   }
+ 
+   static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
+     if (rs != null) {
+       try {
+         rs.close();
+       } catch (SQLException e) {
+         LOG.error("Error closing ResultSet: " + e.getMessage());
+       }
+     }
+ 
+     if (stmt != null) {
+       try {
+         stmt.close();
+       } catch (SQLException e) {
+         System.err.println("Error closing Statement: " + e.getMessage());
+       }
+     }
+ 
+     if (conn != null) {
+       try {
+         conn.rollback();
+       } catch (SQLException e) {
+         System.err.println("Error rolling back: " + e.getMessage());
+       }
+       try {
+         conn.close();
+       } catch (SQLException e) {
+         System.err.println("Error closing Connection: " + e.getMessage());
+       }
+     }
+   }
+ }

Reply via email to