Repository: hive
Updated Branches:
  refs/heads/branch-1 505c5585c -> 0aaddb7d7


HIVE-12439 : CompactionTxnHandler.markCleaned() and TxnHandler.openTxns() misc 
improvements (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0aaddb7d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0aaddb7d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0aaddb7d

Branch: refs/heads/branch-1
Commit: 0aaddb7d753a2936c973d9ab99e6edb2554f94ae
Parents: 505c558
Author: Wei Zheng <w...@apache.org>
Authored: Mon Mar 21 14:50:12 2016 -0700
Committer: Wei Zheng <w...@apache.org>
Committed: Mon Mar 21 14:50:12 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +
 .../metastore/txn/CompactionTxnHandler.java     | 120 +++++----
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |   4 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 245 ++++++++++++++-----
 .../hive/metastore/txn/TestTxnHandler.java      |  83 ++++++-
 5 files changed, 333 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0aaddb7d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4a575b3..b78bea2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -573,6 +573,13 @@ public class HiveConf extends Configuration {
         "select query has incorrect syntax or something similar inside a 
transaction, the\n" +
         "entire transaction will fail and fall-back to DataNucleus will not be 
possible. You\n" +
         "should disable the usage of direct SQL inside transactions if that 
happens in your case."),
+    METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH("hive.direct.sql.max.query.length", 
100, "The maximum\n" +
+        " size of a query string (in KB)."),
+    
METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE("hive.direct.sql.max.elements.in.clause",
 1000,
+        "The maximum number of values in a IN clause. Once exceeded, it will 
be broken into\n" +
+        " multiple OR separated IN clauses."),
+    
METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE("hive.direct.sql.max.elements.values.clause",
+        1000, "The maximum number of values in a VALUES clause for INSERT 
statement."),
     
METASTORE_ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS("hive.metastore.orm.retrieveMapNullsAsEmptyStrings",false,
         "Thrift does not support nulls in maps, so any nulls present in maps 
retrieved from ORM must " +
         "either be pruned or converted to empty strings. Some backing dbs such 
as Oracle persist empty strings " +

http://git-wip-us.apache.org/repos/asf/hive/blob/0aaddb7d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 4d736b9..28e06ed 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -364,36 +364,38 @@ public class CompactionTxnHandler extends TxnHandler {
         rs = stmt.executeQuery(s);
         List<Long> txnids = new ArrayList<>();
         while (rs.next()) txnids.add(rs.getLong(1));
+        // Remove entries from txn_components, as there may be aborted txn 
components
         if (txnids.size() > 0) {
+          List<String> queries = new ArrayList<String>();
+
+          // Prepare prefix and suffix
+          StringBuilder prefix = new StringBuilder();
+          StringBuilder suffix = new StringBuilder();
+
+          prefix.append("delete from TXN_COMPONENTS where ");
 
-          // Remove entries from txn_components, as there may be aborted txn 
components
-          StringBuilder buf = new StringBuilder();
-          //todo: add a safeguard to make sure IN clause is not too large; 
break up by txn id
-          buf.append("delete from TXN_COMPONENTS where tc_txnid in (");
-          boolean first = true;
-          for (long id : txnids) {
-            if (first) first = false;
-            else buf.append(", ");
-            buf.append(id);
-          }
           //because 1 txn may include different partitions/tables even in auto 
commit mode
-          buf.append(") and tc_database = '");
-          buf.append(info.dbname);
-          buf.append("' and tc_table = '");
-          buf.append(info.tableName);
-          buf.append("'");
+          suffix.append(" and tc_database = ");
+          suffix.append(quoteString(info.dbname));
+          suffix.append(" and tc_table = ");
+          suffix.append(quoteString(info.tableName));
           if (info.partName != null) {
-            buf.append(" and tc_partition = '");
-            buf.append(info.partName);
-            buf.append("'");
+            suffix.append(" and tc_partition = ");
+            suffix.append(quoteString(info.partName));
           }
-          LOG.debug("Going to execute update <" + buf.toString() + ">");
-          int rc = stmt.executeUpdate(buf.toString());
-          LOG.debug("Removed " + rc + " records from txn_components");
 
-          // Don't bother cleaning from the txns table.  A separate call will 
do that.  We don't
-          // know here which txns still have components from other tables or 
partitions in the
-          // table, so we don't know which ones we can and cannot clean.
+          // Populate the complete query with provided prefix and suffix
+          TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "tc_txnid", true, false);
+
+          for (String query : queries) {
+            LOG.debug("Going to execute update <" + query + ">");
+            int rc = stmt.executeUpdate(query);
+            LOG.debug("Removed " + rc + " records from txn_components");
+
+            // Don't bother cleaning from the txns table.  A separate call 
will do that.  We don't
+            // know here which txns still have components from other tables or 
partitions in the
+            // table, so we don't know which ones we can and cannot clean.
+          }
         }
 
         LOG.debug("Going to commit");
@@ -440,16 +442,25 @@ public class CompactionTxnHandler extends TxnHandler {
         if(txnids.size() <= 0) {
           return;
         }
-        for(int i = 0; i < txnids.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; 
i++) {
-          List<Long> txnIdBatch = txnids.subList(i * 
TIMED_OUT_TXN_ABORT_BATCH_SIZE,
-            (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-          deleteTxns(dbConn, stmt, txnIdBatch);
-        }
-        int partialBatchSize = txnids.size() % TIMED_OUT_TXN_ABORT_BATCH_SIZE;
-        if(partialBatchSize > 0) {
-          List<Long> txnIdBatch = txnids.subList(txnids.size() - 
partialBatchSize, txnids.size());
-          deleteTxns(dbConn, stmt, txnIdBatch);
+
+        List<String> queries = new ArrayList<String>();
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+
+        prefix.append("delete from TXNS where ");
+        suffix.append("");
+
+        TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, 
txnids, "txn_id", false, false);
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          int rc = stmt.executeUpdate(query);
+          LOG.info("Removed " + rc + "  empty Aborted transactions from TXNS");
         }
+        LOG.info("Aborted transactions removed from TXNS: " + txnids);
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
       } catch (SQLException e) {
         LOG.error("Unable to delete from txns table " + e.getMessage());
         LOG.debug("Going to rollback");
@@ -464,18 +475,6 @@ public class CompactionTxnHandler extends TxnHandler {
       cleanEmptyAbortedTxns();
     }
   }
-  private static void deleteTxns(Connection dbConn, Statement stmt, List<Long> 
txnIdBatch) throws SQLException {
-    StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in 
(");
-    for(long txnid : txnIdBatch) {
-      buf.append(txnid).append(',');
-    }
-    buf.setCharAt(buf.length() - 1, ')');
-    LOG.debug("Going to execute update <" + buf + ">");
-    int rc = stmt.executeUpdate(buf.toString());
-    LOG.info("Removed " + rc + "  empty Aborted transactions: " + txnIdBatch + 
" from TXNS");
-    LOG.debug("Going to commit");
-    dbConn.commit();
-  }
 
   /**
    * This will take all entries assigned to workers
@@ -738,22 +737,21 @@ public class CompactionTxnHandler extends TxnHandler {
           checkForDeletion(deleteSet, ci, rc);
         }
         close(rs);
-        
-        String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id 
IN(";
-        StringBuilder queryStr = new StringBuilder(baseDeleteSql);
-        for(int i = 0; i < deleteSet.size(); i++) {
-          if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) {
-            queryStr.setCharAt(queryStr.length() - 1, ')');
-            stmt.executeUpdate(queryStr.toString());
-            dbConn.commit();
-            queryStr = new StringBuilder(baseDeleteSql);
-          }
-          queryStr.append(deleteSet.get(i)).append(',');
-        }
-        if(queryStr.length() > baseDeleteSql.length()) {
-          queryStr.setCharAt(queryStr.length() - 1, ')');
-          int updCnt = stmt.executeUpdate(queryStr.toString());
-          dbConn.commit();
+
+        List<String> queries = new ArrayList<String>();
+
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+
+        prefix.append("delete from COMPLETED_COMPACTIONS where ");
+        suffix.append("");
+
+        TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, 
deleteSet, "cc_id", false, false);
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          int count = stmt.executeUpdate(query);
+          LOG.debug("Removed " + count + " records from 
COMPLETED_COMPACTIONS");
         }
         dbConn.commit();
       } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0aaddb7d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 17e7c85..42415ac 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -248,7 +248,7 @@ public final class TxnDbUtil {
     }
   }
 
-  private static Connection getConnection() throws Exception {
+  static Connection getConnection() throws Exception {
     HiveConf conf = new HiveConf();
     String jdbcDriver = HiveConf.getVar(conf, 
HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
     Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -264,7 +264,7 @@ public final class TxnDbUtil {
     return conn;
   }
 
-  private static void closeResources(Connection conn, Statement stmt, 
ResultSet rs) {
+  static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
     if (rs != null) {
       try {
         rs.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/0aaddb7d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 59c1637..0ddc078 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -376,19 +376,46 @@ public class TxnHandler {
         s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
+
         long now = getDbTime(dbConn);
-        s = "insert into TXNS (txn_id, txn_state, txn_started, " +
-          "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + 
", " +
-          now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
-        LOG.debug("Going to prepare statement <" + s + ">");
-        PreparedStatement ps = dbConn.prepareStatement(s);
         List<Long> txnIds = new ArrayList<Long>(numTxns);
+        ArrayList<String> queries = new ArrayList<String>();
+        String query;
+        String insertClause = "insert into TXNS (txn_id, txn_state, 
txn_started, txn_last_heartbeat, txn_user, txn_host) values ";
+        StringBuilder valuesClause = new StringBuilder();
+
         for (long i = first; i < first + numTxns; i++) {
-          ps.setLong(1, i);
-          //todo: this would be more efficient with a single insert with 
multiple rows in values()
-          //need add a safeguard to not exceed the DB capabilities.
-          ps.executeUpdate();
           txnIds.add(i);
+
+          if (i > first &&
+              (i - first) % 
conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE)
 == 0) {
+            // wrap up the current query, and start a new one
+            query = insertClause + valuesClause.toString();
+            queries.add(query);
+
+            valuesClause.setLength(0);
+            valuesClause.append("(").append(i).append(", 'o', 
").append(now).append(", ").append(now)
+                .append(", '").append(rqst.getUser()).append("', 
'").append(rqst.getHostname())
+                .append("')");
+
+            continue;
+          }
+
+          if (i > first) {
+            valuesClause.append(", ");
+          }
+
+          valuesClause.append("(").append(i).append(", 'o', 
").append(now).append(", ").append(now)
+              .append(", '").append(rqst.getUser()).append("', 
'").append(rqst.getHostname())
+              .append("')");
+        }
+
+        query = insertClause + valuesClause.toString();
+        queries.add(query);
+
+        for (String q : queries) {
+          LOG.debug("Going to execute update <" + q + ">");
+          stmt.execute(q);
         }
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -1361,6 +1388,100 @@ public class TxnHandler {
   }
 
   /**
+   * Build a query (or queries if one query is too big) with specified 
"prefix" and "suffix",
+   * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) 
OR id in (4,5,6)
+   * For NOT IN case, NOT IN list is broken into multiple AND clauses.
+   * @param queries array of complete query strings
+   * @param prefix part of the query that comes before IN list
+   * @param suffix part of the query that comes after IN list
+   * @param inList the list containing IN list values
+   * @param inColumn column name of IN list operator
+   * @param addParens add a pair of parenthesis outside the IN lists
+   *                  e.g. ( id in (1,2,3) OR id in (4,5,6) )
+   * @param notIn clause to be broken up is NOT IN
+   */
+  public static void buildQueryWithINClause(HiveConf conf, List<String> 
queries, StringBuilder prefix,
+                                            StringBuilder suffix, List<Long> 
inList,
+                                            String inColumn, boolean 
addParens, boolean notIn) {
+    int batchSize = 
conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+    int numWholeBatches = inList.size() / batchSize;
+    StringBuilder buf = new StringBuilder();
+    buf.append(prefix);
+    if (addParens) {
+      buf.append("(");
+    }
+    buf.append(inColumn);
+    if (notIn) {
+      buf.append(" not in (");
+    } else {
+      buf.append(" in (");
+    }
+
+    for (int i = 0; i <= numWholeBatches; i++) {
+      if (needNewQuery(conf, buf)) {
+        // Wrap up current query string
+        if (addParens) {
+          buf.append(")");
+        }
+        buf.append(suffix);
+        queries.add(buf.toString());
+
+        // Prepare a new query string
+        buf.setLength(0);
+      }
+
+      if (i > 0) {
+        if (notIn) {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" and ");
+          }
+          buf.append(inColumn);
+          buf.append(" not in (");
+        } else {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" or ");
+          }
+          buf.append(inColumn);
+          buf.append(" in (");
+        }
+      }
+
+      if (i * batchSize == inList.size()) {
+        // At this point we just realized we don't need another query
+        return;
+      }
+      for (int j = i * batchSize; j < (i + 1) * batchSize && j < 
inList.size(); j++) {
+        buf.append(inList.get(j)).append(",");
+      }
+      buf.setCharAt(buf.length() - 1, ')');
+    }
+
+    if (addParens) {
+      buf.append(")");
+    }
+    buf.append(suffix);
+    queries.add(buf.toString());
+  }
+
+  /** Estimate if the size of a string will exceed certain limit */
+  private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
+    int queryMemoryLimit = 
conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
+    // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
+    long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
+    return sizeInBytes / 1024 > queryMemoryLimit;
+  }
+
+  /**
    * For testing only, do not use.
    */
   @VisibleForTesting
@@ -1841,40 +1962,49 @@ public class TxnHandler {
       stmt = dbConn.createStatement();
       //This is an update statement, thus at any Isolation level will take 
Write locks so will block
       //all other ops using S4U on TXNS row.
-      StringBuilder buf = new StringBuilder("update TXNS set txn_state = '" + 
TXN_ABORTED +
-        "' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
-      boolean first = true;
-      for (Long id : txnids) {
-        if (first) first = false;
-        else buf.append(',');
-        buf.append(id);
-      }
-      buf.append(')');
+      List<String> queries = new ArrayList<String>();
+
+      StringBuilder prefix = new StringBuilder();
+      StringBuilder suffix = new StringBuilder();
+
+      prefix.append("update TXNS set txn_state = " + quoteChar(TXN_ABORTED) +
+        " where txn_state = " + quoteChar(TXN_OPEN) + " and ");
       if(max_heartbeat > 0) {
-        buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+        suffix.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+      } else {
+        suffix.append("");
       }
-      LOG.debug("Going to execute update <" + buf.toString() + ">");
-      updateCnt = stmt.executeUpdate(buf.toString());
-      if(updateCnt < txnids.size()) {
-        /**
-         * have to bail in this case since we don't know which transactions 
were not Aborted and
-         * thus don't know which locks to delete
-         * This may happen due to a race between {@link 
#heartbeat(HeartbeatRequest)}  operation and
-         * {@link #performTimeOuts()}
-         */
-        return updateCnt;
+
+      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, 
"txn_id", true, false);
+
+      for (String query : queries) {
+        LOG.debug("Going to execute update <" + query + ">");
+        updateCnt = stmt.executeUpdate(query);
+        if (updateCnt < txnids.size()) {
+          /**
+           * have to bail in this case since we don't know which transactions 
were not Aborted and
+           * thus don't know which locks to delete
+           * This may happen due to a race between {@link 
#heartbeat(HeartbeatRequest)}  operation and
+           * {@link #performTimeOuts()}
+           */
+          return updateCnt;
+        }
       }
 
-      buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
-      first = true;
-      for (Long id : txnids) {
-        if (first) first = false;
-        else buf.append(',');
-        buf.append(id);
+      queries.clear();
+      prefix.setLength(0);
+      suffix.setLength(0);
+
+      prefix.append("delete from HIVE_LOCKS where ");
+      suffix.append("");
+
+      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, 
"hl_txnid", false, false);
+
+      for (String query : queries) {
+        LOG.debug("Going to execute update <" + query + ">");
+        int rc = stmt.executeUpdate(query);
+        LOG.debug("Removed " + rc + " records from HIVE_LOCKS");
       }
-      buf.append(')');
-      LOG.debug("Going to execute update <" + buf.toString() + ">");
-      stmt.executeUpdate(buf.toString());
     } finally {
       closeStmt(stmt);
     }
@@ -2293,31 +2423,28 @@ public class TxnHandler {
       if(extLockIDs.size() <= 0) {
         return;
       }
-      int deletedLocks = 0;
+
+      List<String> queries = new ArrayList<String>();
+
+      StringBuilder prefix = new StringBuilder();
+      StringBuilder suffix = new StringBuilder();
+
       //include same hl_last_heartbeat condition in case someone heartbeated 
since the select
-      s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + 
maxHeartbeatTime + " and hl_txnid = 0" +
-        " and hl_lock_ext_id IN (";
-      int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE;
-      for(int i = 0; i < numWholeBatches; i++) {
-        StringBuilder sb = new StringBuilder(s);
-        for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * 
TIMED_OUT_TXN_ABORT_BATCH_SIZE; j++) {
-          sb.append(extLockIDs.get(j)).append(",");
-        }
-        sb.setCharAt(sb.length() - 1, ')');
-        LOG.debug("Removing expired locks via: " + sb.toString());
-        deletedLocks += stmt.executeUpdate(sb.toString());
-        dbConn.commit();
-      }
-      StringBuilder sb = new StringBuilder(s);
-      for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < 
extLockIDs.size(); i++) {
-        sb.append(extLockIDs.get(i)).append(",");
+      prefix.append("delete from HIVE_LOCKS where hl_last_heartbeat < ");
+      prefix.append(maxHeartbeatTime);
+      prefix.append(" and hl_txnid = 0 and ");
+      suffix.append("");
+
+      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, 
extLockIDs, "hl_lock_ext_id", true, false);
+
+      int deletedLocks = 0;
+      for (String query : queries) {
+        LOG.debug("Removing expired locks via: " + query);
+        deletedLocks += stmt.executeUpdate(query);
       }
-      sb.setCharAt(sb.length() - 1, ')');
-      LOG.debug("Removing expired locks via: " + sb.toString());
-      deletedLocks += stmt.executeUpdate(sb.toString());
       if(deletedLocks > 0) {
         LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due 
to timeout (vs. " +
-          extLockIDs.size() + " found. List: " + extLockIDs + ") 
maxHeartbeatTime=" + maxHeartbeatTime);
+            extLockIDs.size() + " found. List: " + extLockIDs + ") 
maxHeartbeatTime=" + maxHeartbeatTime);
       }
       LOG.debug("Going to commit");
       dbConn.commit();

http://git-wip-us.apache.org/repos/asf/hive/blob/0aaddb7d/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index e53daae..930af7c 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -23,12 +23,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -1219,6 +1217,83 @@ public class TestTxnHandler {
     stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = 
hl_last_heartbeat + 1");
   }
 
+  @Test
+  public void testBuildQueryWithINClause() throws Exception {
+    List<String> queries = new ArrayList<String>();
+
+    StringBuilder prefix = new StringBuilder();
+    StringBuilder suffix = new StringBuilder();
+
+    // Note, this is a "real" query that depends on one of the metastore tables
+    prefix.append("select count(*) from TXNS where ");
+    suffix.append(" and TXN_STATE = 'o'");
+
+    // Case 1 - Max in list members: 10; Max query string length: 1KB
+    //          The first query happens to have 2 full batches.
+    conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1);
+    
conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 
10);
+    List<Long> inList = new ArrayList<Long>();
+    for (long i = 1; i <= 200; i++) {
+      inList.add(i);
+    }
+    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, 
"TXN_ID", true, false);
+    Assert.assertEquals(1, queries.size());
+    runAgainstDerby(queries);
+
+    // Case 2 - Max in list members: 10; Max query string length: 1KB
+    //          The first query has 2 full batches, and the second query only 
has 1 batch which only contains 1 member
+    queries.clear();
+    inList.add((long)201);
+    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, 
"TXN_ID", true, false);
+    Assert.assertEquals(2, queries.size());
+    runAgainstDerby(queries);
+
+    // Case 3 - Max in list members: 1000; Max query string length: 5KB
+    conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 
10);
+    
conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 
1000);
+    queries.clear();
+    for (long i = 202; i <= 4321; i++) {
+      inList.add(i);
+    }
+    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, 
"TXN_ID", true, false);
+    Assert.assertEquals(3, queries.size());
+    runAgainstDerby(queries);
+
+    // Case 4 - NOT IN list
+    queries.clear();
+    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, 
"TXN_ID", true, true);
+    Assert.assertEquals(3, queries.size());
+    runAgainstDerby(queries);
+
+    // Case 5 - No parenthesis
+    queries.clear();
+    suffix.setLength(0);
+    suffix.append("");
+    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, 
"TXN_ID", false, false);
+    Assert.assertEquals(3, queries.size());
+    runAgainstDerby(queries);
+  }
+
+  /** Verify queries can run against Derby DB.
+   *  As long as Derby doesn't complain, we assume the query is 
syntactically/semantically correct.
+   */
+  private void runAgainstDerby(List<String> queries) throws Exception {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      conn = TxnDbUtil.getConnection();
+      stmt = conn.createStatement();
+      for (String query : queries) {
+        rs = stmt.executeQuery(query);
+        Assert.assertTrue("The query is not valid", rs.next());
+      }
+    } finally {
+      TxnDbUtil.closeResources(conn, stmt, rs);
+    }
+  }
+
   @Before
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();

Reply via email to