http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
new file mode 100644
index 0000000..96a7f56
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
+import org.apache.hadoop.hive.metastore.api.*;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ */
[email protected]
[email protected]
+public interface TxnStore extends Configurable {
+
+  enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, 
CheckLock,
+    WriteSetCleaner, CompactionScheduler}
+  // Compactor states (Should really be enum)
+  String INITIATED_RESPONSE = "initiated";
+  String WORKING_RESPONSE = "working";
+  String CLEANING_RESPONSE = "ready for cleaning";
+  String FAILED_RESPONSE = "failed";
+  String SUCCEEDED_RESPONSE = "succeeded";
+  String ATTEMPTED_RESPONSE = "attempted";
+
+  int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
+
+  /**
+   * Get information about open transactions.  This gives extensive 
information about the
+   * transactions rather than just the list of transactions.  This should be 
used when the need
+   * is to see information about the transactions (e.g. show transactions).
+   * @return information about open transactions
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+  /**
+   * Get list of valid transactions.  This gives just the list of transactions 
that are open.
+   * @return list of open transactions, as well as a high water mark.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+  /**
+   * Get the count for open transactions.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  void countOpenTxns() throws MetaException;
+
+  /**
+   * Open a set of transactions
+   * @param rqst request to open transactions
+   * @return information on opened transactions
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+  /**
+   * Abort (rollback) a transaction.
+   * @param rqst info on transaction to abort
+   * @throws NoSuchTxnException
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, 
MetaException, TxnAbortedException;
+
+  /**
+   * Abort (rollback) a list of transactions in one request.
+   * @param rqst info on transactions to abort
+   * @throws NoSuchTxnException
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, 
MetaException;
+
+  /**
+   * Commit a transaction
+   * @param rqst info on transaction to commit
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  void commitTxn(CommitTxnRequest rqst)
+    throws NoSuchTxnException, TxnAbortedException,  MetaException;
+
+  /**
+   * Obtain a lock.
+   * @param rqst information on the lock to obtain.  If the requester is part 
of a transaction
+   *             the txn information must be included in the lock request.
+   * @return info on the lock, including whether it was obtained.
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  @RetrySemantics.CannotRetry
+  LockResponse lock(LockRequest rqst)
+    throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+  /**
+   * Check whether a lock has been obtained.  This is used after {@link #lock} 
returned a wait
+   * state.
+   * @param rqst info on the lock to check
+   * @return info on the state of the lock
+   * @throws NoSuchTxnException
+   * @throws NoSuchLockException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  @RetrySemantics.SafeToRetry
+  LockResponse checkLock(CheckLockRequest rqst)
+    throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, 
MetaException;
+
+  /**
+   * Unlock a lock.  It is not legal to call this if the caller is part of a 
txn.  In that case
+   * the txn should be committed or aborted instead.  (Note someday this will 
change since
+   * multi-statement transactions will allow unlocking in the transaction.)
+   * @param rqst lock to unlock
+   * @throws NoSuchLockException
+   * @throws TxnOpenException
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  void unlock(UnlockRequest rqst)
+    throws NoSuchLockException, TxnOpenException, MetaException;
+
+  /**
+   * Get information on current locks.
+   * @param rqst lock information to retrieve
+   * @return lock information.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+  /**
+   * Send a heartbeat for a lock or a transaction
+   * @param ids lock and/or txn id to heartbeat
+   * @throws NoSuchTxnException
+   * @throws NoSuchLockException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  @RetrySemantics.SafeToRetry
+  void heartbeat(HeartbeatRequest ids)
+    throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, 
MetaException;
+
+  /**
+   * Heartbeat a group of transactions together
+   * @param rqst set of transactions to heartbat
+   * @return info on txns that were heartbeated
+   * @throws MetaException
+   */
+  @RetrySemantics.SafeToRetry
+  HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+    throws MetaException;
+
+  /**
+   * Submit a compaction request into the queue.  This is called when a user 
manually requests a
+   * compaction.
+   * @param rqst information on what to compact
+   * @return id of the compaction that has been started or existing id if this 
resource is already scheduled
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  CompactionResponse compact(CompactionRequest rqst) throws MetaException;
+
+  /**
+   * Show list of current compactions
+   * @param rqst info on which compactions to show
+   * @return compaction information
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  ShowCompactResponse showCompact(ShowCompactRequest rqst) throws 
MetaException;
+
+  /**
+   * Add information on a set of dynamic partitions that participated in a 
transaction.
+   * @param rqst dynamic partition info.
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  @RetrySemantics.SafeToRetry
+  void addDynamicPartitions(AddDynamicPartitions rqst)
+      throws NoSuchTxnException,  TxnAbortedException, MetaException;
+
+  /**
+   * Clean up corresponding records in metastore tables
+   * @param type Hive object type
+   * @param db database object
+   * @param table table object
+   * @param partitionIterator partition iterator
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  void cleanupRecords(HiveObjectType type, Database db, Table table,
+                             Iterator<Partition> partitionIterator) throws 
MetaException;
+
+  /**
+   * Timeout transactions and/or locks.  This should only be called by the 
compactor.
+   */
+  @RetrySemantics.Idempotent
+  void performTimeOuts();
+
+  /**
+   * This will look through the completed_txn_components table and look for 
partitions or tables
+   * that may be ready for compaction.  Also, look through txns and 
txn_components tables for
+   * aborted transactions that we should add to the list.
+   * @param maxAborted Maximum number of aborted queries to allow before 
marking this as a
+   *                   potential compaction.
+   * @return list of CompactionInfo structs.  These will not have id, type,
+   * or runAs set since these are only potential compactions not actual ones.
+   */
+  @RetrySemantics.ReadOnly
+  Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException;
+
+  /**
+   * Sets the user to run as.  This is for the case
+   * where the request was generated by the user and so the worker must set 
this value later.
+   * @param cq_id id of this entry in the queue
+   * @param user user to run the jobs as
+   */
+  @RetrySemantics.Idempotent
+  void setRunAs(long cq_id, String user) throws MetaException;
+
+  /**
+   * This will grab the next compaction request off of
+   * the queue, and assign it to the worker.
+   * @param workerId id of the worker calling this, will be recorded in the db
+   * @return an info element for this compaction request, or null if there is 
no work to do now.
+   */
+  @RetrySemantics.ReadOnly
+  CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+  /**
+   * This will mark an entry in the queue as compacted
+   * and put it in the ready to clean state.
+   * @param info info on the compaction entry to mark as compacted.
+   */
+  @RetrySemantics.SafeToRetry
+  void markCompacted(CompactionInfo info) throws MetaException;
+
+  /**
+   * Find entries in the queue that are ready to
+   * be cleaned.
+   * @return information on the entry in the queue.
+   */
+  @RetrySemantics.ReadOnly
+  List<CompactionInfo> findReadyToClean() throws MetaException;
+
+  /**
+   * This will remove an entry from the queue after
+   * it has been compacted.
+   * 
+   * @param info info on the compaction entry to remove
+   */
+  @RetrySemantics.CannotRetry
+  void markCleaned(CompactionInfo info) throws MetaException;
+
+  /**
+   * Mark a compaction entry as failed.  This will move it to the compaction 
history queue with a
+   * failed status.  It will NOT clean up aborted transactions in the 
table/partition associated
+   * with this compaction.
+   * @param info information on the compaction that failed.
+   * @throws MetaException
+   */
+  @RetrySemantics.CannotRetry
+  void markFailed(CompactionInfo info) throws MetaException;
+
+  /**
+   * Clean up aborted transactions from txns that have no components in 
txn_components.  The reson such
+   * txns exist can be that now work was done in this txn (e.g. Streaming 
opened TransactionBatch and
+   * abandoned it w/o doing any work) or due to {@link 
#markCleaned(CompactionInfo)} being called.
+   */
+  @RetrySemantics.SafeToRetry
+  void cleanEmptyAbortedTxns() throws MetaException;
+
+  /**
+   * This will take all entries assigned to workers
+   * on a host return them to INITIATED state.  The initiator should use this 
at start up to
+   * clean entries from any workers that were in the middle of compacting when 
the metastore
+   * shutdown.  It does not reset entries from worker threads on other hosts 
as those may still
+   * be working.
+   * @param hostname Name of this host.  It is assumed this prefixes the 
thread's worker id,
+   *                 so that like hostname% will match the worker id.
+   */
+  @RetrySemantics.Idempotent
+  void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+  /**
+   * This call will return all compaction queue
+   * entries assigned to a worker but over the timeout back to the initiated 
state.
+   * This should be called by the initiator on start up and occasionally when 
running to clean up
+   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} 
should be called
+   * first.
+   * @param timeout number of milliseconds since start time that should elapse 
before a worker is
+   *                declared dead.
+   */
+  @RetrySemantics.Idempotent
+  void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+  /**
+   * Queries metastore DB directly to find columns in the table which have 
statistics information.
+   * If {@code ci} includes partition info then per partition stats info is 
examined, otherwise
+   * table level stats are examined.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+  /**
+   * Record the highest txn id that the {@code ci} compaction job will pay 
attention to.
+   */
+  @RetrySemantics.Idempotent
+  void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws 
MetaException;
+
+  /**
+   * For any given compactable entity (partition, table if not partitioned) 
the history of compactions
+   * may look like "sssfffaaasffss", for example.  The idea is to retain the 
tail (most recent) of the
+   * history such that a configurable number of each type of state is present. 
 Any other entries
+   * can be purged.  This scheme has advantage of always retaining the last 
failure/success even if
+   * it's not recent.
+   * @throws MetaException
+   */
+  @RetrySemantics.SafeToRetry
+  void purgeCompactionHistory() throws MetaException;
+
+  /**
+   * WriteSet tracking is used to ensure proper transaction isolation.  This 
method deletes the 
+   * transaction metadata once it becomes unnecessary.  
+   */
+  @RetrySemantics.SafeToRetry
+  void performWriteSetGC();
+
+  /**
+   * Determine if there are enough consecutive failures compacting a table or 
partition that no
+   * new automatic compactions should be scheduled.  User initiated 
compactions do not do this
+   * check.
+   * @param ci  Table or partition to check.
+   * @return true if it is ok to compact, false if there have been too many 
failures.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+  @VisibleForTesting
+  int numLocksInLockTable() throws SQLException, MetaException;
+
+  @VisibleForTesting
+  long setTimeout(long milliseconds);
+
+  @RetrySemantics.Idempotent
+  MutexAPI getMutexAPI();
+
+  /**
+   * This is primarily designed to provide coarse grained mutex support to 
operations running
+   * inside the Metastore (of which there could be several instances).  The 
initial goal is to 
+   * ensure that various sub-processes of the Compactor don't step on each 
other.
+   * 
+   * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use 
it sparingly.
+   */
+  interface MutexAPI {
+    /**
+     * The {@code key} is name of the lock. Will acquire and exclusive lock or 
block.  It retuns
+     * a handle which must be used to release the lock.  Each invocation 
returns a new handle.
+     */
+    LockHandle acquireLock(String key) throws MetaException;
+
+    /**
+     * Same as {@link #acquireLock(String)} but takes an already existing 
handle as input.  This 
+     * will associate the lock on {@code key} with the same handle.  All locks 
associated with
+     * the same handle will be released together.
+     * @param handle not NULL
+     */
+    void acquireLock(String key, LockHandle handle) throws MetaException;
+    interface LockHandle {
+      /**
+       * Releases all locks associated with this handle.
+       */
+      void releaseLocks();
+    }
+  }
+
+  /**
+   * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a 
job to the cluster,
+   * it calls this to update the metadata.
+   * @param id {@link CompactionInfo#id}
+   */
+  @RetrySemantics.Idempotent
+  void setHadoopJobId(String hadoopJobId, long id);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
new file mode 100644
index 0000000..7579ae8
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+public class TxnUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
+  /**
+   * Transform a {@link 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that 
the caller intends to
+   * read the files, and thus treats both open and aborted transactions as 
invalid.
+   * @param txns txn list from the metastore
+   * @param currentTxn Current transaction that the user has open.  If this is 
greater than 0 it
+   *                   will be removed from the exceptions list so that the 
user sees his own
+   *                   transaction as valid.
+   * @return a valid txn list.
+   */
+  public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, 
long currentTxn) {
+    /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) 
assuming currentTxn>0
+     * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see 
result of 8 which
+     * doesn't make sense for Snapshot Isolation.  Of course for Read 
Committed, the list should
+     * inlude the latest committed set.*/
+    long highWater = txns.getTxn_high_water_mark();
+    List<Long> open = txns.getOpen_txns();
+    BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
+    long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+    int i = 0;
+    for(long txn: open) {
+      if (currentTxn > 0 && currentTxn == txn) continue;
+      exceptions[i++] = txn;
+    }
+    if(txns.isSetMin_open_txn()) {
+      return new ValidReadTxnList(exceptions, abortedBits, highWater, 
txns.getMin_open_txn());
+    }
+    else {
+      return new ValidReadTxnList(exceptions, abortedBits, highWater);
+    }
+  }
+
+  /**
+   * Transform a {@link 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that 
the caller intends to
+   * compact the files, and thus treats only open transactions as invalid.  
Additionally any
+   * txnId &gt; highestOpenTxnId is also invalid.  This is to avoid creating 
something like
+   * delta_17_120 where txnId 80, for example, is still open.
+   * @param txns txn list from the metastore
+   * @return a valid txn list.
+   */
+  public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse 
txns) {
+    //highWater is the last txn id that has been allocated
+    long highWater = txns.getTxn_high_water_mark();
+    long minOpenTxn = Long.MAX_VALUE;
+    long[] exceptions = new long[txns.getOpen_txnsSize()];
+    int i = 0;
+    for (TxnInfo txn : txns.getOpen_txns()) {
+      if (txn.getState() == TxnState.OPEN) {
+        minOpenTxn = Math.min(minOpenTxn, txn.getId());
+      }
+      else {
+        //only need aborted since we don't consider anything above minOpenTxn
+        exceptions[i++] = txn.getId();
+      }
+    }
+    if(i < exceptions.length) {
+      exceptions = Arrays.copyOf(exceptions, i);
+    }
+    highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+    BitSet bitSet = new BitSet(exceptions.length);
+    bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything 
in exceptions are aborted
+    if(minOpenTxn == Long.MAX_VALUE) {
+      return new ValidCompactorTxnList(exceptions, bitSet, highWater);
+    }
+    else {
+      return new ValidCompactorTxnList(exceptions, bitSet, highWater, 
minOpenTxn);
+    }
+  }
+
+  /**
+   * Get an instance of the TxnStore that is appropriate for this store
+   * @param conf configuration
+   * @return txn store
+   */
+  public static TxnStore getTxnStore(Configuration conf) {
+    String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL);
+    try {
+      TxnStore handler = JavaUtils.getClass(className, 
TxnStore.class).newInstance();
+      handler.setConf(conf);
+      return handler;
+    } catch (Exception e) {
+      LOG.error("Unable to instantiate raw store directly in fastpath mode", 
e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Checks if a table is a valid ACID table.
+   * Note, users are responsible for using the correct TxnManager. We do not 
look at
+   * SessionState.get().getTxnMgr().supportsAcid() here
+   * @param table table
+   * @return true if table is a legit ACID table, false otherwise
+   */
+  public static boolean isAcidTable(Table table) {
+    if (table == null) {
+      return false;
+    }
+    Map<String, String> parameters = table.getParameters();
+    String tableIsTransactional = 
parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
+  }
+
+  /**
+   * Build a query (or queries if one query is too big) with specified 
"prefix" and "suffix",
+   * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) 
OR id in (4,5,6)
+   * For NOT IN case, NOT IN list is broken into multiple AND clauses.
+   * @param queries array of complete query strings
+   * @param prefix part of the query that comes before IN list
+   * @param suffix part of the query that comes after IN list
+   * @param inList the list containing IN list values
+   * @param inColumn column name of IN list operator
+   * @param addParens add a pair of parenthesis outside the IN lists
+   *                  e.g. ( id in (1,2,3) OR id in (4,5,6) )
+   * @param notIn clause to be broken up is NOT IN
+   */
+  public static void buildQueryWithINClause(Configuration conf, List<String> 
queries, StringBuilder prefix,
+                                            StringBuilder suffix, List<Long> 
inList,
+                                            String inColumn, boolean 
addParens, boolean notIn) {
+    if (inList == null || inList.size() == 0) {
+      throw new IllegalArgumentException("The IN list is empty!");
+    }
+    int batchSize = MetastoreConf.getIntVar(conf, 
ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+    int numWholeBatches = inList.size() / batchSize;
+    StringBuilder buf = new StringBuilder();
+    buf.append(prefix);
+    if (addParens) {
+      buf.append("(");
+    }
+    buf.append(inColumn);
+    if (notIn) {
+      buf.append(" not in (");
+    } else {
+      buf.append(" in (");
+    }
+
+    for (int i = 0; i <= numWholeBatches; i++) {
+      if (i * batchSize == inList.size()) {
+        // At this point we just realized we don't need another query
+        break;
+      }
+
+      if (needNewQuery(conf, buf)) {
+        // Wrap up current query string
+        if (addParens) {
+          buf.append(")");
+        }
+        buf.append(suffix);
+        queries.add(buf.toString());
+
+        // Prepare a new query string
+        buf.setLength(0);
+      }
+
+      if (i > 0) {
+        if (notIn) {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" and ");
+          }
+          buf.append(inColumn);
+          buf.append(" not in (");
+        } else {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" or ");
+          }
+          buf.append(inColumn);
+          buf.append(" in (");
+        }
+      }
+
+      for (int j = i * batchSize; j < (i + 1) * batchSize && j < 
inList.size(); j++) {
+        buf.append(inList.get(j)).append(",");
+      }
+      buf.setCharAt(buf.length() - 1, ')');
+    }
+
+    if (addParens) {
+      buf.append(")");
+    }
+    buf.append(suffix);
+    queries.add(buf.toString());
+  }
+
+  /** Estimate if the size of a string will exceed certain limit */
+  private static boolean needNewQuery(Configuration conf, StringBuilder sb) {
+    int queryMemoryLimit = MetastoreConf.getIntVar(conf, 
ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
+    // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
+    long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
+    return sizeInBytes / 1024 > queryMemoryLimit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java
index 81f8a85..40f7393 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java
@@ -17,7 +17,16 @@
  */
 package org.apache.hadoop.hive.metastore.utils;
 
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 public class JavaUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
+
   /**
    * Standard way of getting classloader in Hive code (outside of Hadoop).
    *
@@ -34,4 +43,41 @@ public class JavaUtils {
     }
     return classLoader;
   }
+
+  @SuppressWarnings(value = "unchecked")
+  public static <T> Class<? extends T> getClass(String className, Class<T> 
clazz)
+      throws MetaException {
+    try {
+      return (Class<? extends T>) Class.forName(className, true, 
getClassLoader());
+    } catch (ClassNotFoundException e) {
+      throw new MetaException(className + " class not found");
+    }
+  }
+
+  /**
+   * @return name of current host
+   */
+  public static String hostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Unable to resolve my host name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Utility method for ACID to normalize logging info.  Matches
+   * org.apache.hadoop.hive.metastore.api.LockRequest#toString
+   */
+  public static String lockIdToString(long extLockId) {
+    return "lockid:" + extLockId;
+  }
+  /**
+   * Utility method for ACID to normalize logging info.  Matches
+   * org.apache.hadoop.hive.metastore.api.LockResponse#toString
+   */
+  public static String txnIdToString(long txnId) {
+    return "txnid:" + txnId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
new file mode 100644
index 0000000..b3f1749
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A utility class that can convert a HashMap of Properties into a colon 
separated string,
+ * and can take the same format of string and convert it to a HashMap of 
Properties.
+ */
+public class StringableMap extends HashMap<String, String> {
+
+  public StringableMap(String s) {
+    String[] parts = s.split(":", 2);
+    // read that many chars
+    int numElements = Integer.parseInt(parts[0]);
+    s = parts[1];
+    for (int i = 0; i < numElements; i++) {
+      parts = s.split(":", 2);
+      int len = Integer.parseInt(parts[0]);
+      String key = null;
+      if (len > 0) key = parts[1].substring(0, len);
+      parts = parts[1].substring(len).split(":", 2);
+      len = Integer.parseInt(parts[0]);
+      String value = null;
+      if (len > 0) value = parts[1].substring(0, len);
+      s = parts[1].substring(len);
+      put(key, value);
+    }
+  }
+
+  public StringableMap(Map<String, String> m) {
+    super(m);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(size());
+    buf.append(':');
+    if (size() > 0) {
+      for (Map.Entry<String, String> entry : entrySet()) {
+        int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
+        buf.append(entry.getKey() == null ? 0 : length);
+        buf.append(':');
+        if (length > 0) buf.append(entry.getKey());
+        length = (entry.getValue() == null) ? 0 : entry.getValue().length();
+        buf.append(length);
+        buf.append(':');
+        if (length > 0) buf.append(entry.getValue());
+      }
+    }
+    return buf.toString();
+  }
+
+  public Properties toProperties() {
+    Properties props = new Properties();
+    props.putAll(this);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
index 082b1b0..9aff091 100644
--- 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
+++ 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.metastore.conf;
 
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.core.StringContains;
+import org.hamcrest.core.StringEndsWith;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -411,4 +414,18 @@ public class TestMetastoreConf {
     Assert.assertEquals("abc", MetastoreConf.get(conf, "a.random.key"));
   }
 
+  @Test
+  public void dumpConfig() throws IOException {
+    createConfFile("metastore-site.xml", true, "METASTORE_HOME", instaMap(
+        "test.long", "23"
+    ));
+    conf = MetastoreConf.newMetastoreConf();
+    String dump = MetastoreConf.dumpConfig(conf);
+    Assert.assertThat(dump, new StringContains("Used metastore-site file: 
file:/"));
+    Assert.assertThat(dump, new StringContains("Key: <test.long> old hive key: 
<hive.test.long>  value: <23>"));
+    Assert.assertThat(dump, new StringContains("Key: <test.str> old hive key: 
<hive.test.str>  value: <defaultval>"));
+    Assert.assertThat(dump, new StringEndsWith("Finished MetastoreConf 
object.\n"));
+    // Make sure the hidden keys didn't get published
+    Assert.assertThat(dump, CoreMatchers.not(new 
StringContains(ConfVars.PWD.varname)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java
 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java
new file mode 100644
index 0000000..3f21ed1
--- /dev/null
+++ 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.datasource;
+
+import com.jolbox.bonecp.BoneCPDataSource;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
+public class TestDataSourceProviderFactory {
+
+  private Configuration conf;
+
+  @Before
+  public void init() {
+    conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_USER_NAME, "dummyUser");
+    MetastoreConf.setVar(conf, ConfVars.PWD, "dummyPass");
+  }
+
+  @Test
+  public void testNoDataSourceCreatedWithoutProps() throws SQLException {
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNull(dsp);
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
BoneCPDataSourceProvider.BONECP);
+
+    dsp = DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNull(dsp);
+  }
+
+  @Test
+  public void testCreateBoneCpDataSource() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
BoneCPDataSourceProvider.BONECP);
+    conf.set(BoneCPDataSourceProvider.BONECP + ".firstProp", "value");
+    conf.set(BoneCPDataSourceProvider.BONECP + ".secondProp", "value");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof BoneCPDataSource);
+  }
+
+  @Test
+  public void testSetBoneCpStringProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
BoneCPDataSourceProvider.BONECP);
+    conf.set(BoneCPDataSourceProvider.BONECP + ".initSQL", "select 1 from 
dual");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof BoneCPDataSource);
+    Assert.assertEquals("select 1 from dual", 
((BoneCPDataSource)ds).getInitSQL());
+  }
+
+  @Test
+  public void testSetBoneCpNumberProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
BoneCPDataSourceProvider.BONECP);
+    conf.set(BoneCPDataSourceProvider.BONECP + ".acquireRetryDelayInMs", 
"599");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof BoneCPDataSource);
+    Assert.assertEquals(599L, 
((BoneCPDataSource)ds).getAcquireRetryDelayInMs());
+  }
+
+  @Test
+  public void testSetBoneCpBooleanProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
BoneCPDataSourceProvider.BONECP);
+    conf.set(BoneCPDataSourceProvider.BONECP + ".disableJMX", "true");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof BoneCPDataSource);
+    Assert.assertEquals(true, ((BoneCPDataSource)ds).isDisableJMX());
+  }
+
+  @Test
+  public void testCreateHikariCpDataSource() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
HikariCPDataSourceProvider.HIKARI);
+    // This is needed to prevent the HikariDataSource from trying to connect 
to the DB
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", 
"-1");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof HikariDataSource);
+  }
+
+  @Test
+  public void testSetHikariCpStringProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
HikariCPDataSourceProvider.HIKARI);
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".connectionInitSql", "select 
1 from dual");
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", 
"-1");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof HikariDataSource);
+    Assert.assertEquals("select 1 from dual", 
((HikariDataSource)ds).getConnectionInitSql());
+  }
+
+  @Test
+  public void testSetHikariCpNumberProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
HikariCPDataSourceProvider.HIKARI);
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".idleTimeout", "59999");
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", 
"-1");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof HikariDataSource);
+    Assert.assertEquals(59999L, ((HikariDataSource)ds).getIdleTimeout());
+  }
+
+  @Test
+  public void testSetHikariCpBooleanProperty() throws SQLException {
+
+    MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, 
HikariCPDataSourceProvider.HIKARI);
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".allowPoolSuspension", 
"false");
+    conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", 
"-1");
+
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+    Assert.assertNotNull(dsp);
+
+    DataSource ds = dsp.create(conf);
+    Assert.assertTrue(ds instanceof HikariDataSource);
+    Assert.assertEquals(false, ((HikariDataSource)ds).isAllowPoolSuspension());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
new file mode 100644
index 0000000..d8640b5
--- /dev/null
+++ 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+public class TestTxnHandlerNegative {
+  static final private Logger LOG = 
LoggerFactory.getLogger(TestTxnHandlerNegative.class);
+
+  /**
+   * this intentionally sets a bad URL for connection to test error handling 
logic
+   * in TxnHandler
+   * @throws Exception
+   */
+  @Test
+  public void testBadConnection() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTURLKEY, "blah");
+    RuntimeException e = null;
+    try {
+      TxnUtils.getTxnStore(conf);
+    }
+    catch(RuntimeException ex) {
+      LOG.info("Expected error: " + ex.getMessage(), ex);
+      e = ex;
+    }
+    assertNotNull(e);
+    assertTrue(
+        e.getMessage().contains("No suitable driver found for blah")
+        || e.getMessage().contains("Failed to get driver instance for 
jdbcUrl=blah")
+    );
+  }
+}

Reply via email to