http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java new file mode 100644 index 0000000..96a7f56 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hive.common.classification.RetrySemantics; +import org.apache.hadoop.hive.metastore.api.*; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * A handler to answer transaction related calls that come into the metastore + * server. + */ [email protected] [email protected] +public interface TxnStore extends Configurable { + + enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, + WriteSetCleaner, CompactionScheduler} + // Compactor states (Should really be enum) + String INITIATED_RESPONSE = "initiated"; + String WORKING_RESPONSE = "working"; + String CLEANING_RESPONSE = "ready for cleaning"; + String FAILED_RESPONSE = "failed"; + String SUCCEEDED_RESPONSE = "succeeded"; + String ATTEMPTED_RESPONSE = "attempted"; + + int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000; + + /** + * Get information about open transactions. This gives extensive information about the + * transactions rather than just the list of transactions. This should be used when the need + * is to see information about the transactions (e.g. show transactions). + * @return information about open transactions + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; + + /** + * Get list of valid transactions. This gives just the list of transactions that are open. + * @return list of open transactions, as well as a high water mark. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + GetOpenTxnsResponse getOpenTxns() throws MetaException; + + /** + * Get the count for open transactions. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + void countOpenTxns() throws MetaException; + + /** + * Open a set of transactions + * @param rqst request to open transactions + * @return information on opened transactions + * @throws MetaException + */ + @RetrySemantics.Idempotent + OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; + + /** + * Abort (rollback) a transaction. + * @param rqst info on transaction to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException; + + /** + * Abort (rollback) a list of transactions in one request. + * @param rqst info on transactions to abort + * @throws NoSuchTxnException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException; + + /** + * Commit a transaction + * @param rqst info on transaction to commit + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void commitTxn(CommitTxnRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Obtain a lock. + * @param rqst information on the lock to obtain. If the requester is part of a transaction + * the txn information must be included in the lock request. + * @return info on the lock, including whether it was obtained. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.CannotRetry + LockResponse lock(LockRequest rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Check whether a lock has been obtained. This is used after {@link #lock} returned a wait + * state. + * @param rqst info on the lock to check + * @return info on the state of the lock + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + LockResponse checkLock(CheckLockRequest rqst) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Unlock a lock. It is not legal to call this if the caller is part of a txn. In that case + * the txn should be committed or aborted instead. (Note someday this will change since + * multi-statement transactions will allow unlocking in the transaction.) + * @param rqst lock to unlock + * @throws NoSuchLockException + * @throws TxnOpenException + * @throws MetaException + */ + @RetrySemantics.Idempotent + void unlock(UnlockRequest rqst) + throws NoSuchLockException, TxnOpenException, MetaException; + + /** + * Get information on current locks. + * @param rqst lock information to retrieve + * @return lock information. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; + + /** + * Send a heartbeat for a lock or a transaction + * @param ids lock and/or txn id to heartbeat + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void heartbeat(HeartbeatRequest ids) + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; + + /** + * Heartbeat a group of transactions together + * @param rqst set of transactions to heartbat + * @return info on txns that were heartbeated + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) + throws MetaException; + + /** + * Submit a compaction request into the queue. This is called when a user manually requests a + * compaction. + * @param rqst information on what to compact + * @return id of the compaction that has been started or existing id if this resource is already scheduled + * @throws MetaException + */ + @RetrySemantics.Idempotent + CompactionResponse compact(CompactionRequest rqst) throws MetaException; + + /** + * Show list of current compactions + * @param rqst info on which compactions to show + * @return compaction information + * @throws MetaException + */ + @RetrySemantics.ReadOnly + ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; + + /** + * Add information on a set of dynamic partitions that participated in a transaction. + * @param rqst dynamic partition info. + * @throws NoSuchTxnException + * @throws TxnAbortedException + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void addDynamicPartitions(AddDynamicPartitions rqst) + throws NoSuchTxnException, TxnAbortedException, MetaException; + + /** + * Clean up corresponding records in metastore tables + * @param type Hive object type + * @param db database object + * @param table table object + * @param partitionIterator partition iterator + * @throws MetaException + */ + @RetrySemantics.Idempotent + void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException; + + /** + * Timeout transactions and/or locks. This should only be called by the compactor. + */ + @RetrySemantics.Idempotent + void performTimeOuts(); + + /** + * This will look through the completed_txn_components table and look for partitions or tables + * that may be ready for compaction. Also, look through txns and txn_components tables for + * aborted transactions that we should add to the list. + * @param maxAborted Maximum number of aborted queries to allow before marking this as a + * potential compaction. + * @return list of CompactionInfo structs. These will not have id, type, + * or runAs set since these are only potential compactions not actual ones. + */ + @RetrySemantics.ReadOnly + Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException; + + /** + * Sets the user to run as. This is for the case + * where the request was generated by the user and so the worker must set this value later. + * @param cq_id id of this entry in the queue + * @param user user to run the jobs as + */ + @RetrySemantics.Idempotent + void setRunAs(long cq_id, String user) throws MetaException; + + /** + * This will grab the next compaction request off of + * the queue, and assign it to the worker. + * @param workerId id of the worker calling this, will be recorded in the db + * @return an info element for this compaction request, or null if there is no work to do now. + */ + @RetrySemantics.ReadOnly + CompactionInfo findNextToCompact(String workerId) throws MetaException; + + /** + * This will mark an entry in the queue as compacted + * and put it in the ready to clean state. + * @param info info on the compaction entry to mark as compacted. + */ + @RetrySemantics.SafeToRetry + void markCompacted(CompactionInfo info) throws MetaException; + + /** + * Find entries in the queue that are ready to + * be cleaned. + * @return information on the entry in the queue. + */ + @RetrySemantics.ReadOnly + List<CompactionInfo> findReadyToClean() throws MetaException; + + /** + * This will remove an entry from the queue after + * it has been compacted. + * + * @param info info on the compaction entry to remove + */ + @RetrySemantics.CannotRetry + void markCleaned(CompactionInfo info) throws MetaException; + + /** + * Mark a compaction entry as failed. This will move it to the compaction history queue with a + * failed status. It will NOT clean up aborted transactions in the table/partition associated + * with this compaction. + * @param info information on the compaction that failed. + * @throws MetaException + */ + @RetrySemantics.CannotRetry + void markFailed(CompactionInfo info) throws MetaException; + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reson such + * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and + * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. + */ + @RetrySemantics.SafeToRetry + void cleanEmptyAbortedTxns() throws MetaException; + + /** + * This will take all entries assigned to workers + * on a host return them to INITIATED state. The initiator should use this at start up to + * clean entries from any workers that were in the middle of compacting when the metastore + * shutdown. It does not reset entries from worker threads on other hosts as those may still + * be working. + * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, + * so that like hostname% will match the worker id. + */ + @RetrySemantics.Idempotent + void revokeFromLocalWorkers(String hostname) throws MetaException; + + /** + * This call will return all compaction queue + * entries assigned to a worker but over the timeout back to the initiated state. + * This should be called by the initiator on start up and occasionally when running to clean up + * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called + * first. + * @param timeout number of milliseconds since start time that should elapse before a worker is + * declared dead. + */ + @RetrySemantics.Idempotent + void revokeTimedoutWorkers(long timeout) throws MetaException; + + /** + * Queries metastore DB directly to find columns in the table which have statistics information. + * If {@code ci} includes partition info then per partition stats info is examined, otherwise + * table level stats are examined. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException; + + /** + * Record the highest txn id that the {@code ci} compaction job will pay attention to. + */ + @RetrySemantics.Idempotent + void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; + + /** + * For any given compactable entity (partition, table if not partitioned) the history of compactions + * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the + * history such that a configurable number of each type of state is present. Any other entries + * can be purged. This scheme has advantage of always retaining the last failure/success even if + * it's not recent. + * @throws MetaException + */ + @RetrySemantics.SafeToRetry + void purgeCompactionHistory() throws MetaException; + + /** + * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the + * transaction metadata once it becomes unnecessary. + */ + @RetrySemantics.SafeToRetry + void performWriteSetGC(); + + /** + * Determine if there are enough consecutive failures compacting a table or partition that no + * new automatic compactions should be scheduled. User initiated compactions do not do this + * check. + * @param ci Table or partition to check. + * @return true if it is ok to compact, false if there have been too many failures. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; + + @VisibleForTesting + int numLocksInLockTable() throws SQLException, MetaException; + + @VisibleForTesting + long setTimeout(long milliseconds); + + @RetrySemantics.Idempotent + MutexAPI getMutexAPI(); + + /** + * This is primarily designed to provide coarse grained mutex support to operations running + * inside the Metastore (of which there could be several instances). The initial goal is to + * ensure that various sub-processes of the Compactor don't step on each other. + * + * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly. + */ + interface MutexAPI { + /** + * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns + * a handle which must be used to release the lock. Each invocation returns a new handle. + */ + LockHandle acquireLock(String key) throws MetaException; + + /** + * Same as {@link #acquireLock(String)} but takes an already existing handle as input. This + * will associate the lock on {@code key} with the same handle. All locks associated with + * the same handle will be released together. + * @param handle not NULL + */ + void acquireLock(String key, LockHandle handle) throws MetaException; + interface LockHandle { + /** + * Releases all locks associated with this handle. + */ + void releaseLocks(); + } + } + + /** + * Once a {@link java.util.concurrent.ThreadPoolExecutor} Worker submits a job to the cluster, + * it calls this to update the metadata. + * @param id {@link CompactionInfo#id} + */ + @RetrySemantics.Idempotent + void setHadoopJobId(String hadoopJobId, long id); +}
http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java new file mode 100644 index 0000000..7579ae8 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ValidCompactorTxnList; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; +import java.util.Map; + +public class TxnUtils { + private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * read the files, and thus treats both open and aborted transactions as invalid. + * @param txns txn list from the metastore + * @param currentTxn Current transaction that the user has open. If this is greater than 0 it + * will be removed from the exceptions list so that the user sees his own + * transaction as valid. + * @return a valid txn list. + */ + public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 + * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which + * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should + * inlude the latest committed set.*/ + long highWater = txns.getTxn_high_water_mark(); + List<Long> open = txns.getOpen_txns(); + BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); + long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; + int i = 0; + for(long txn: open) { + if (currentTxn > 0 && currentTxn == txn) continue; + exceptions[i++] = txn; + } + if(txns.isSetMin_open_txn()) { + return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn()); + } + else { + return new ValidReadTxnList(exceptions, abortedBits, highWater); + } + } + + /** + * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a + * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to + * compact the files, and thus treats only open transactions as invalid. Additionally any + * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like + * delta_17_120 where txnId 80, for example, is still open. + * @param txns txn list from the metastore + * @return a valid txn list. + */ + public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { + //highWater is the last txn id that has been allocated + long highWater = txns.getTxn_high_water_mark(); + long minOpenTxn = Long.MAX_VALUE; + long[] exceptions = new long[txns.getOpen_txnsSize()]; + int i = 0; + for (TxnInfo txn : txns.getOpen_txns()) { + if (txn.getState() == TxnState.OPEN) { + minOpenTxn = Math.min(minOpenTxn, txn.getId()); + } + else { + //only need aborted since we don't consider anything above minOpenTxn + exceptions[i++] = txn.getId(); + } + } + if(i < exceptions.length) { + exceptions = Arrays.copyOf(exceptions, i); + } + highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; + BitSet bitSet = new BitSet(exceptions.length); + bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted + if(minOpenTxn == Long.MAX_VALUE) { + return new ValidCompactorTxnList(exceptions, bitSet, highWater); + } + else { + return new ValidCompactorTxnList(exceptions, bitSet, highWater, minOpenTxn); + } + } + + /** + * Get an instance of the TxnStore that is appropriate for this store + * @param conf configuration + * @return txn store + */ + public static TxnStore getTxnStore(Configuration conf) { + String className = MetastoreConf.getVar(conf, ConfVars.TXN_STORE_IMPL); + try { + TxnStore handler = JavaUtils.getClass(className, TxnStore.class).newInstance(); + handler.setConf(conf); + return handler; + } catch (Exception e) { + LOG.error("Unable to instantiate raw store directly in fastpath mode", e); + throw new RuntimeException(e); + } + } + + /** Checks if a table is a valid ACID table. + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + Map<String, String> parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + + /** + * Build a query (or queries if one query is too big) with specified "prefix" and "suffix", + * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6) + * For NOT IN case, NOT IN list is broken into multiple AND clauses. + * @param queries array of complete query strings + * @param prefix part of the query that comes before IN list + * @param suffix part of the query that comes after IN list + * @param inList the list containing IN list values + * @param inColumn column name of IN list operator + * @param addParens add a pair of parenthesis outside the IN lists + * e.g. ( id in (1,2,3) OR id in (4,5,6) ) + * @param notIn clause to be broken up is NOT IN + */ + public static void buildQueryWithINClause(Configuration conf, List<String> queries, StringBuilder prefix, + StringBuilder suffix, List<Long> inList, + String inColumn, boolean addParens, boolean notIn) { + if (inList == null || inList.size() == 0) { + throw new IllegalArgumentException("The IN list is empty!"); + } + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); + int numWholeBatches = inList.size() / batchSize; + StringBuilder buf = new StringBuilder(); + buf.append(prefix); + if (addParens) { + buf.append("("); + } + buf.append(inColumn); + if (notIn) { + buf.append(" not in ("); + } else { + buf.append(" in ("); + } + + for (int i = 0; i <= numWholeBatches; i++) { + if (i * batchSize == inList.size()) { + // At this point we just realized we don't need another query + break; + } + + if (needNewQuery(conf, buf)) { + // Wrap up current query string + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + + // Prepare a new query string + buf.setLength(0); + } + + if (i > 0) { + if (notIn) { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" and "); + } + buf.append(inColumn); + buf.append(" not in ("); + } else { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" or "); + } + buf.append(inColumn); + buf.append(" in ("); + } + } + + for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) { + buf.append(inList.get(j)).append(","); + } + buf.setCharAt(buf.length() - 1, ')'); + } + + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + } + + /** Estimate if the size of a string will exceed certain limit */ + private static boolean needNewQuery(Configuration conf, StringBuilder sb) { + int queryMemoryLimit = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH); + // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml + long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8); + return sizeInBytes / 1024 > queryMemoryLimit; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java index 81f8a85..40f7393 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/JavaUtils.java @@ -17,7 +17,16 @@ */ package org.apache.hadoop.hive.metastore.utils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; + public class JavaUtils { + public static final Logger LOG = LoggerFactory.getLogger(JavaUtils.class); + /** * Standard way of getting classloader in Hive code (outside of Hadoop). * @@ -34,4 +43,41 @@ public class JavaUtils { } return classLoader; } + + @SuppressWarnings(value = "unchecked") + public static <T> Class<? extends T> getClass(String className, Class<T> clazz) + throws MetaException { + try { + return (Class<? extends T>) Class.forName(className, true, getClassLoader()); + } catch (ClassNotFoundException e) { + throw new MetaException(className + " class not found"); + } + } + + /** + * @return name of current host + */ + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + } + + /** + * Utility method for ACID to normalize logging info. Matches + * org.apache.hadoop.hive.metastore.api.LockRequest#toString + */ + public static String lockIdToString(long extLockId) { + return "lockid:" + extLockId; + } + /** + * Utility method for ACID to normalize logging info. Matches + * org.apache.hadoop.hive.metastore.api.LockResponse#toString + */ + public static String txnIdToString(long txnId) { + return "txnid:" + txnId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java new file mode 100644 index 0000000..b3f1749 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * A utility class that can convert a HashMap of Properties into a colon separated string, + * and can take the same format of string and convert it to a HashMap of Properties. + */ +public class StringableMap extends HashMap<String, String> { + + public StringableMap(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.parseInt(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.parseInt(parts[0]); + String key = null; + if (len > 0) key = parts[1].substring(0, len); + parts = parts[1].substring(len).split(":", 2); + len = Integer.parseInt(parts[0]); + String value = null; + if (len > 0) value = parts[1].substring(0, len); + s = parts[1].substring(len); + put(key, value); + } + } + + public StringableMap(Map<String, String> m) { + super(m); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Map.Entry<String, String> entry : entrySet()) { + int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); + buf.append(entry.getKey() == null ? 0 : length); + buf.append(':'); + if (length > 0) buf.append(entry.getKey()); + length = (entry.getValue() == null) ? 0 : entry.getValue().length(); + buf.append(length); + buf.append(':'); + if (length > 0) buf.append(entry.getValue()); + } + } + return buf.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + props.putAll(this); + return props; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java index 082b1b0..9aff091 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hive.metastore.conf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.conf.Configuration; +import org.hamcrest.CoreMatchers; +import org.hamcrest.core.StringContains; +import org.hamcrest.core.StringEndsWith; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -411,4 +414,18 @@ public class TestMetastoreConf { Assert.assertEquals("abc", MetastoreConf.get(conf, "a.random.key")); } + @Test + public void dumpConfig() throws IOException { + createConfFile("metastore-site.xml", true, "METASTORE_HOME", instaMap( + "test.long", "23" + )); + conf = MetastoreConf.newMetastoreConf(); + String dump = MetastoreConf.dumpConfig(conf); + Assert.assertThat(dump, new StringContains("Used metastore-site file: file:/")); + Assert.assertThat(dump, new StringContains("Key: <test.long> old hive key: <hive.test.long> value: <23>")); + Assert.assertThat(dump, new StringContains("Key: <test.str> old hive key: <hive.test.str> value: <defaultval>")); + Assert.assertThat(dump, new StringEndsWith("Finished MetastoreConf object.\n")); + // Make sure the hidden keys didn't get published + Assert.assertThat(dump, CoreMatchers.not(new StringContains(ConfVars.PWD.varname))); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java new file mode 100644 index 0000000..3f21ed1 --- /dev/null +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/datasource/TestDataSourceProviderFactory.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.datasource; + +import com.jolbox.bonecp.BoneCPDataSource; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.SQLException; + +public class TestDataSourceProviderFactory { + + private Configuration conf; + + @Before + public void init() { + conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, ConfVars.CONNECTION_USER_NAME, "dummyUser"); + MetastoreConf.setVar(conf, ConfVars.PWD, "dummyPass"); + } + + @Test + public void testNoDataSourceCreatedWithoutProps() throws SQLException { + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNull(dsp); + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, BoneCPDataSourceProvider.BONECP); + + dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNull(dsp); + } + + @Test + public void testCreateBoneCpDataSource() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, BoneCPDataSourceProvider.BONECP); + conf.set(BoneCPDataSourceProvider.BONECP + ".firstProp", "value"); + conf.set(BoneCPDataSourceProvider.BONECP + ".secondProp", "value"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof BoneCPDataSource); + } + + @Test + public void testSetBoneCpStringProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, BoneCPDataSourceProvider.BONECP); + conf.set(BoneCPDataSourceProvider.BONECP + ".initSQL", "select 1 from dual"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof BoneCPDataSource); + Assert.assertEquals("select 1 from dual", ((BoneCPDataSource)ds).getInitSQL()); + } + + @Test + public void testSetBoneCpNumberProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, BoneCPDataSourceProvider.BONECP); + conf.set(BoneCPDataSourceProvider.BONECP + ".acquireRetryDelayInMs", "599"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof BoneCPDataSource); + Assert.assertEquals(599L, ((BoneCPDataSource)ds).getAcquireRetryDelayInMs()); + } + + @Test + public void testSetBoneCpBooleanProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, BoneCPDataSourceProvider.BONECP); + conf.set(BoneCPDataSourceProvider.BONECP + ".disableJMX", "true"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof BoneCPDataSource); + Assert.assertEquals(true, ((BoneCPDataSource)ds).isDisableJMX()); + } + + @Test + public void testCreateHikariCpDataSource() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + // This is needed to prevent the HikariDataSource from trying to connect to the DB + conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", "-1"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + } + + @Test + public void testSetHikariCpStringProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + conf.set(HikariCPDataSourceProvider.HIKARI + ".connectionInitSql", "select 1 from dual"); + conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", "-1"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + Assert.assertEquals("select 1 from dual", ((HikariDataSource)ds).getConnectionInitSql()); + } + + @Test + public void testSetHikariCpNumberProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + conf.set(HikariCPDataSourceProvider.HIKARI + ".idleTimeout", "59999"); + conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", "-1"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + Assert.assertEquals(59999L, ((HikariDataSource)ds).getIdleTimeout()); + } + + @Test + public void testSetHikariCpBooleanProperty() throws SQLException { + + MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI); + conf.set(HikariCPDataSourceProvider.HIKARI + ".allowPoolSuspension", "false"); + conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", "-1"); + + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + Assert.assertNotNull(dsp); + + DataSource ds = dsp.create(conf); + Assert.assertTrue(ds instanceof HikariDataSource); + Assert.assertEquals(false, ((HikariDataSource)ds).isAllowPoolSuspension()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java new file mode 100644 index 0000000..d8640b5 --- /dev/null +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; + +public class TestTxnHandlerNegative { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnHandlerNegative.class); + + /** + * this intentionally sets a bad URL for connection to test error handling logic + * in TxnHandler + * @throws Exception + */ + @Test + public void testBadConnection() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CONNECTURLKEY, "blah"); + RuntimeException e = null; + try { + TxnUtils.getTxnStore(conf); + } + catch(RuntimeException ex) { + LOG.info("Expected error: " + ex.getMessage(), ex); + e = ex; + } + assertNotNull(e); + assertTrue( + e.getMessage().contains("No suitable driver found for blah") + || e.getMessage().contains("Failed to get driver instance for jdbcUrl=blah") + ); + } +}
