HIVE-17482: External LLAP client: acquire locks for tables queried directly by LLAP (Jason Dere, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76d6e8a0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76d6e8a0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76d6e8a0 Branch: refs/heads/hive-14535 Commit: 76d6e8a092ed2c73d98f4e128d9620843d00c799 Parents: 2216dad Author: Jason Dere <jd...@hortonworks.com> Authored: Fri Sep 29 11:08:10 2017 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Fri Sep 29 11:08:10 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 87 +++++++++++++++ .../java/org/apache/hadoop/hive/ql/Driver.java | 111 +++++++++++++------ .../org/apache/hadoop/hive/ql/QueryState.java | 14 +++ .../hive/ql/parse/BaseSemanticAnalyzer.java | 13 ++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +- .../ql/parse/UpdateDeleteSemanticAnalyzer.java | 2 +- .../hadoop/hive/ql/session/SessionState.java | 15 +++ .../ql/udf/generic/GenericUDTFGetSplits.java | 90 +++++++++++++-- 8 files changed, 292 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 8b4b21f..ea0aadf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hive.ql; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -28,14 +32,21 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.LockState; +import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; @@ -704,6 +715,82 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); } } + + @Test + public void testGetSplitsLocks() throws Exception { + // Need to test this with LLAP settings, which requires some additional configurations set. + HiveConf modConf = new HiveConf(hiveConf); + setupTez(modConf); + modConf.setVar(ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + modConf.setVar(ConfVars.HIVEFETCHTASKCONVERSION, "more"); + modConf.setVar(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "localhost"); + + // SessionState/Driver needs to be restarted with the Tez conf settings. + restartSessionAndDriver(modConf); + TxnStore txnHandler = TxnUtils.getTxnStore(modConf); + + try { + // Request LLAP splits for a table. + String queryParam = "select * from " + Table.ACIDTBL; + runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)"); + + // The get_splits call should have resulted in a lock on ACIDTBL + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); + TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, + "default", Table.ACIDTBL.name, null, slr.getLocks()); + assertEquals(1, slr.getLocksSize()); + + // Try another table. + queryParam = "select * from " + Table.ACIDTBLPART; + runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)"); + + // Should now have new lock on ACIDTBLPART + slr = txnHandler.showLocks(new ShowLocksRequest()); + TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, + "default", Table.ACIDTBLPART.name, null, slr.getLocks()); + assertEquals(2, slr.getLocksSize()); + + // There should be different txn IDs associated with each lock. + Set<Long> txnSet = new HashSet<Long>(); + for (ShowLocksResponseElement lockResponseElem : slr.getLocks()) { + txnSet.add(lockResponseElem.getTxnid()); + } + assertEquals(2, txnSet.size()); + + List<String> rows = runStatementOnDriver("show transactions"); + // Header row + 2 transactions = 3 rows + assertEquals(3, rows.size()); + } finally { + // Close the session which should free up the TxnHandler/locks held by the session. + // Done in the finally block to make sure we free up the locks; otherwise + // the cleanup in tearDown() will get stuck waiting on the lock held here on ACIDTBL. + restartSessionAndDriver(hiveConf); + } + + // Lock should be freed up now. + ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest()); + assertEquals(0, slr.getLocksSize()); + + List<String> rows = runStatementOnDriver("show transactions"); + // Transactions should be committed. + // No transactions - just the header row + assertEquals(1, rows.size()); + } + + private void restartSessionAndDriver(HiveConf conf) throws Exception { + SessionState ss = SessionState.get(); + if (ss != null) { + ss.close(); + } + if (d != null) { + d.destroy(); + d.close(); + } + + SessionState.start(conf); + d = new Driver(conf); + } + // Ideally test like this should be a qfile test. However, the explain output from qfile is always // slightly different depending on where the test is run, specifically due to file size estimation private void testJoin(String engine, String joinType) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index ec228b4..f01edf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -175,6 +175,19 @@ public class Driver implements CommandProcessor { private QueryLifeTimeHookRunner queryLifeTimeHookRunner; private final HooksLoader hooksLoader; + // Transaction manager the Driver has been initialized with (can be null). + // If this is set then this Transaction manager will be used during query + // compilation/execution rather than using the current session's transaction manager. + // This might be needed in a situation where a Driver is nested within an already + // running Driver/query - the nested Driver requires a separate transaction manager + // so as not to conflict with the outer Driver/query which is using the session + // transaction manager. + private final HiveTxnManager initTxnMgr; + + // Transaction manager used for the query. This will be set at compile time based on + // either initTxnMgr or from the SessionState, in that order. + private HiveTxnManager queryTxnMgr; + public enum DriverState { INITIALIZED, COMPILING, @@ -352,8 +365,12 @@ public class Driver implements CommandProcessor { this(getNewQueryState(conf), null); } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { + this(getNewQueryState(conf), null, null, txnMgr); + } + public Driver(HiveConf conf, Context ctx) { - this(getNewQueryState(conf), null); + this(getNewQueryState(conf), null, null); this.ctx = ctx; } @@ -362,18 +379,22 @@ public class Driver implements CommandProcessor { } public Driver(QueryState queryState, String userName) { - this(queryState, userName, new HooksLoader(queryState.getConf()), null); + this(queryState, userName, new HooksLoader(queryState.getConf()), null, null); } public Driver(HiveConf conf, HooksLoader hooksLoader) { - this(getNewQueryState(conf), null, hooksLoader, null); + this(getNewQueryState(conf), null, hooksLoader, null, null); } public Driver(QueryState queryState, String userName, QueryInfo queryInfo) { - this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo); + this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo, null); + } + + public Driver(QueryState queryState, String userName, QueryInfo queryInfo, HiveTxnManager txnMgr) { + this(queryState, userName, new HooksLoader(queryState.getConf()), queryInfo, txnMgr); } - public Driver(QueryState queryState, String userName, HooksLoader hooksLoader, QueryInfo queryInfo) { + public Driver(QueryState queryState, String userName, HooksLoader hooksLoader, QueryInfo queryInfo, HiveTxnManager txnMgr) { this.queryState = queryState; this.conf = queryState.getConf(); isParallelEnabled = (conf != null) @@ -382,6 +403,7 @@ public class Driver implements CommandProcessor { this.hooksLoader = hooksLoader; this.queryLifeTimeHookRunner = new QueryLifeTimeHookRunner(conf, hooksLoader, console); this.queryInfo = queryInfo; + this.initTxnMgr = txnMgr; } /** @@ -477,16 +499,22 @@ public class Driver implements CommandProcessor { try { // Initialize the transaction manager. This must be done before analyze is called. - final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf); - // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks + if (initTxnMgr != null) { + queryTxnMgr = initTxnMgr; + } else { + queryTxnMgr = SessionState.get().initTxnMgr(conf); + } + queryState.setTxnManager(queryTxnMgr); + // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks // if compile is being called multiple times, clear the old shutdownhook ShutdownHookManager.removeShutdownHook(shutdownRunner); + final HiveTxnManager txnMgr = queryTxnMgr; shutdownRunner = new Runnable() { @Override public void run() { try { - releaseLocksAndCommitOrRollback(false, txnManager); + releaseLocksAndCommitOrRollback(false, txnMgr); } catch (LockException e) { LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage()); @@ -535,13 +563,13 @@ public class Driver implements CommandProcessor { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - if(checkConcurrency() && startImplicitTxn(txnManager)) { + if(checkConcurrency() && startImplicitTxn(queryTxnMgr)) { String userFromUGI = getUserFromUGI(); - if (!txnManager.isTxnOpen()) { + if (!queryTxnMgr.isTxnOpen()) { if(userFromUGI == null) { return 10; } - long txnid = txnManager.openTxn(ctx, userFromUGI); + long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); } } // Do semantic analysis and plan generation @@ -1133,13 +1161,12 @@ public class Driver implements CommandProcessor { // Write the current set of valid transactions into the conf file so that it can be read by // the input format. - private void recordValidTxns() throws LockException { + private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { ValidTxnList oldList = null; String s = conf.get(ValidTxnList.VALID_TXNS_KEY); if(s != null && s.length() > 0) { oldList = new ValidReadTxnList(s); } - HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); ValidTxnList txns = txnMgr.getValidTxns(); if(oldList != null) { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + @@ -1172,6 +1199,7 @@ public class Driver implements CommandProcessor { } return null; } + /** * Acquire read and write locks needed by the statement. The list of objects to be locked are * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on @@ -1184,9 +1212,7 @@ public class Driver implements CommandProcessor { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); - if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) { + if(!queryTxnMgr.isTxnOpen() && queryTxnMgr.supportsAcid()) { /*non acid txn managers don't support txns but fwd lock requests to lock managers acid txn manager requires all locks to be associated with a txn so if we end up here w/o an open txn it's because we are processing something like "use <database> @@ -1205,17 +1231,17 @@ public class Driver implements CommandProcessor { //so this makes (file name -> data) mapping stable acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); for (FileSinkDesc desc : acidSinks) { - desc.setTransactionId(txnMgr.getCurrentTxnId()); + desc.setTransactionId(queryTxnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes - desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement()); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if(txnMgr.recordSnapshot(plan)) { - recordValidTxns(); + queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); + if(queryTxnMgr.recordSnapshot(plan)) { + recordValidTxns(queryTxnMgr); } return 0; } catch (Exception e) { @@ -1233,6 +1259,11 @@ public class Driver implements CommandProcessor { private boolean haveAcidWrite() { return !plan.getAcidSinks().isEmpty(); } + + public void releaseLocksAndCommitOrRollback(boolean commit) throws LockException { + releaseLocksAndCommitOrRollback(commit, queryTxnMgr); + } + /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. @@ -1246,8 +1277,8 @@ public class Driver implements CommandProcessor { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); HiveTxnManager txnMgr; if (txnManager == null) { - SessionState ss = SessionState.get(); - txnMgr = ss.getTxnMgr(); + // Default to driver's txn manager if no txn manager specified + txnMgr = queryTxnMgr; } else { txnMgr = txnManager; } @@ -1366,6 +1397,23 @@ public class Driver implements CommandProcessor { return createProcessorResponse(compileInternal(command, false)); } + public CommandProcessorResponse lockAndRespond() { + // Assumes the query has already been compiled + if (plan == null) { + throw new IllegalStateException( + "No previously compiled query for driver - queryId=" + queryState.getQueryId()); + } + + int ret = 0; + if (requiresLock()) { + ret = acquireLocks(); + } + if (ret != 0) { + return rollback(createProcessorResponse(ret)); + } + return createProcessorResponse(ret); + } + private static final ReentrantLock globalCompileLock = new ReentrantLock(); private int compileInternal(String command, boolean deferClose) { int ret; @@ -1396,7 +1444,7 @@ public class Driver implements CommandProcessor { if (ret != 0) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1536,8 +1584,7 @@ public class Driver implements CommandProcessor { // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. - HiveTxnManager txnManager = SessionState.get().getTxnMgr(); - ctx.setHiveTxnManager(txnManager); + ctx.setHiveTxnManager(queryTxnMgr); if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation @@ -1559,11 +1606,11 @@ public class Driver implements CommandProcessor { //if needRequireLock is false, the release here will do nothing because there is no lock try { //since set autocommit starts an implicit txn, close it - if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true, null); + if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) { + releaseLocksAndCommitOrRollback(true); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } else { //txn (if there is one started) is not finished @@ -1614,7 +1661,7 @@ public class Driver implements CommandProcessor { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -2373,7 +2420,7 @@ public class Driver implements CommandProcessor { if(destroyed) { if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); @@ -2428,7 +2475,7 @@ public class Driver implements CommandProcessor { } if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false, null); + releaseLocksAndCommitOrRollback(false); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index fa7c323..7d5aa8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; /** @@ -39,6 +40,11 @@ public class QueryState { private HiveOperation commandType; /** + * transaction manager used in the query. + */ + private HiveTxnManager txnManager; + + /** * Private constructor, use QueryState.Builder instead * @param conf The query specific configuration object */ @@ -73,6 +79,14 @@ public class QueryState { return queryConf; } + public HiveTxnManager getTxnManager() { + return txnManager; + } + + public void setTxnManager(HiveTxnManager txnManager) { + this.txnManager = txnManager; + } + /** * Builder to instantiate the QueryState object. */ http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 3ad30c4..06e00d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; @@ -78,6 +79,7 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -123,6 +125,8 @@ public abstract class BaseSemanticAnalyzer { // whether any ACID table is involved in a query protected boolean acidInQuery; + protected HiveTxnManager txnManager; + public static final int HIVE_COLUMN_ORDER_ASC = 1; public static final int HIVE_COLUMN_ORDER_DESC = 0; public static final int HIVE_COLUMN_NULLS_FIRST = 0; @@ -160,7 +164,6 @@ public abstract class BaseSemanticAnalyzer { autoCommitValue = autoCommit; } - public boolean skipAuthorization() { return false; } @@ -230,6 +233,7 @@ public abstract class BaseSemanticAnalyzer { idToTableNameMap = new HashMap<String, String>(); inputs = new LinkedHashSet<ReadEntity>(); outputs = new LinkedHashSet<WriteEntity>(); + txnManager = queryState.getTxnManager(); } catch (Exception e) { throw new SemanticException(e); } @@ -1918,4 +1922,11 @@ public abstract class BaseSemanticAnalyzer { fetch.setSerializationNullFormat(" "); return (FetchTask) TaskFactory.get(fetch, conf); } + + protected HiveTxnManager getTxnMgr() { + if (txnManager != null) { + return txnManager; + } + return SessionState.get().getTxnMgr(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 551aafc..68ddcf0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13657,7 +13657,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) { - if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) { + if (SessionState.get() == null || !getTxnMgr().supportsAcid()) { return AcidUtils.Operation.NOT_ACID; } else if (isAcidOutputFormat(of)) { return getAcidType(dest); @@ -13676,7 +13676,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Make sure the proper transaction manager that supports ACID is being used protected void checkAcidTxnManager(Table table) throws SemanticException { - if (SessionState.get() != null && !SessionState.get().getTxnMgr().supportsAcid()) { + if (SessionState.get() != null && !getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.TXNMGR_NOT_ACID, table.getDbName(), table.getTableName()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 702be2e..b3193d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -70,7 +70,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { super.analyzeInternal(tree); } else { - if (!SessionState.get().getTxnMgr().supportsAcid()) { + if (!getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); } switch (tree.getToken().getType()) { http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index cceeec0..6dece05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.session; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -290,6 +291,8 @@ public class SessionState { private String atsDomainId; + private List<Closeable> cleanupItems = new LinkedList<Closeable>(); + /** * Get the lineage state stored in this session. * @@ -1651,6 +1654,14 @@ public class SessionState { } public void close() throws IOException { + for (Closeable cleanupItem : cleanupItems) { + try { + cleanupItem.close(); + } catch (Exception err) { + LOG.error("Error processing SessionState cleanup item " + cleanupItem.toString(), err); + } + } + registry.clear(); if (txnMgr != null) txnMgr.closeTxnManager(); JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader); @@ -1927,6 +1938,10 @@ public class SessionState { public KillQuery getKillQuery() { return killQuery; } + + public void addCleanupItem(Closeable item) { + cleanupItems.add(item); + } } class ResourceMaps { http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 5003f42..610f36f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; @@ -69,6 +70,8 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -184,12 +187,21 @@ public class GenericUDTFGetSplits extends GenericUDTF { String query = stringOI.getPrimitiveJavaObject(arguments[0]); int num = intOI.get(arguments[1]); - PlanFragment fragment = createPlanFragment(query, num); + // Generate applicationId for the LLAP splits + LlapCoordinator coordinator = LlapCoordinator.getInstance(); + if (coordinator == null) { + throw new HiveException("LLAP coordinator is not initialized; must be running in HS2 with " + + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled"); + } + ApplicationId applicationId = coordinator.createExtClientAppId(); + LOG.info("Generated appID {} for LLAP splits", applicationId.toString()); + + PlanFragment fragment = createPlanFragment(query, num, applicationId); TezWork tezWork = fragment.work; Schema schema = fragment.schema; try { - for (InputSplit s : getSplits(jc, num, tezWork, schema)) { + for (InputSplit s : getSplits(jc, num, tezWork, schema, applicationId)) { Object[] os = new Object[1]; bos.reset(); s.write(dos); @@ -202,7 +214,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { } } - public PlanFragment createPlanFragment(String query, int num) + public PlanFragment createPlanFragment(String query, int num, ApplicationId splitsAppId) throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); @@ -224,7 +236,17 @@ public class GenericUDTFGetSplits extends GenericUDTF { throw new HiveException(e); } - Driver driver = new Driver(conf); + // Instantiate Driver to compile the query passed in. + // This UDF is running as part of an existing query, which may already be using the + // SessionState TxnManager. If this new Driver also tries to use the same TxnManager + // then this may mess up the existing state of the TxnManager. + // So initialize the new Driver with a new TxnManager so that it does not use the + // Session TxnManager that is already in use. + HiveTxnManager txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + Driver driver = new Driver(conf, txnManager); + driver.init(); + DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, splitsAppId.toString()); + boolean needsCleanup = true; try { CommandProcessorResponse cpr = driver.compileAndRespond(query); if (cpr.getResponseCode() != 0) { @@ -276,16 +298,38 @@ public class GenericUDTFGetSplits extends GenericUDTF { } tezWork = ((TezTask)roots.get(0)).getWork(); + } else { + // Table will be queried directly by LLAP + // Acquire locks if necessary - they will be released during session cleanup. + // The read will have READ_COMMITTED level semantics. + cpr = driver.lockAndRespond(); + if (cpr.getResponseCode() != 0) { + throw new HiveException("Failed to acquire locks: " + cpr.getException()); + } + + // Attach the resources to the session cleanup. + SessionState.get().addCleanupItem(driverCleanup); + needsCleanup = false; } return new PlanFragment(tezWork, schema, jc); } finally { - driver.close(); - driver.destroy(); + if (needsCleanup) { + if (driverCleanup != null) { + try { + driverCleanup.close(); + } catch (IOException err) { + throw new HiveException(err); + } + } else if (driver != null) { + driver.close(); + driver.destroy(); + } + } } } - public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) + public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema, ApplicationId applicationId) throws IOException { DAG dag = DAG.create(work.getName()); @@ -311,7 +355,6 @@ public class GenericUDTFGetSplits extends GenericUDTF { // Update the queryId to use the generated applicationId. See comment below about // why this is done. - ApplicationId applicationId = coordinator.createExtClientAppId(); HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString()); Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr, new ArrayList<LocalResource>(), fs, ctx, false, work, @@ -412,6 +455,37 @@ public class GenericUDTFGetSplits extends GenericUDTF { } } + private static class DriverCleanup implements Closeable { + private final Driver driver; + private final HiveTxnManager txnManager; + private final String applicationId; + + public DriverCleanup(Driver driver, HiveTxnManager txnManager, String applicationId) { + this.driver = driver; + this.txnManager = txnManager; + this.applicationId = applicationId; + } + + @Override + public void close() throws IOException { + try { + LOG.info("DriverCleanup for LLAP splits: {}", applicationId); + driver.releaseLocksAndCommitOrRollback(true); + driver.close(); + driver.destroy(); + txnManager.closeTxnManager(); + } catch (Exception err) { + LOG.error("Error closing driver resources", err); + throw new IOException(err); + } + } + + @Override + public String toString() { + return "DriverCleanup for LLAP splits: " + applicationId; + } + } + private static class JobTokenCreator { private static Token<JobTokenIdentifier> createJobToken(ApplicationId applicationId) { String tokenIdentifier = applicationId.toString();