HIVE-17482: External LLAP client: acquire locks for tables queried directly by 
LLAP (Jason Dere, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76d6e8a0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76d6e8a0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76d6e8a0

Branch: refs/heads/hive-14535
Commit: 76d6e8a092ed2c73d98f4e128d9620843d00c799
Parents: 2216dad
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Sep 29 11:08:10 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Sep 29 11:08:10 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    |  87 +++++++++++++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 111 +++++++++++++------
 .../org/apache/hadoop/hive/ql/QueryState.java   |  14 +++
 .../hive/ql/parse/BaseSemanticAnalyzer.java     |  13 ++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   4 +-
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  |   2 +-
 .../hadoop/hive/ql/session/SessionState.java    |  15 +++
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  90 +++++++++++++--
 8 files changed, 292 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 8b4b21f..ea0aadf 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -18,9 +18,13 @@
 
 package org.apache.hadoop.hive.ql;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,14 +32,21 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.After;
@@ -704,6 +715,82 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  
~/dev/hiverwgit/itests/h
       Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), 
rs.get(i).endsWith(expected2[i][1]));
     }
   }
+
+  @Test
+  public void testGetSplitsLocks() throws Exception {
+    // Need to test this with LLAP settings, which requires some additional 
configurations set.
+    HiveConf modConf = new HiveConf(hiveConf);
+    setupTez(modConf);
+    modConf.setVar(ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    modConf.setVar(ConfVars.HIVEFETCHTASKCONVERSION, "more");
+    modConf.setVar(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "localhost");
+
+    // SessionState/Driver needs to be restarted with the Tez conf settings.
+    restartSessionAndDriver(modConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(modConf);
+
+    try {
+      // Request LLAP splits for a table.
+      String queryParam = "select * from " + Table.ACIDTBL;
+      runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)");
+
+      // The get_splits call should have resulted in a lock on ACIDTBL
+      ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
+      TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED,
+          "default", Table.ACIDTBL.name, null, slr.getLocks());
+      assertEquals(1, slr.getLocksSize());
+
+      // Try another table.
+      queryParam = "select * from " + Table.ACIDTBLPART;
+      runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)");
+
+      // Should now have new lock on ACIDTBLPART
+      slr = txnHandler.showLocks(new ShowLocksRequest());
+      TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED,
+          "default", Table.ACIDTBLPART.name, null, slr.getLocks());
+      assertEquals(2, slr.getLocksSize());
+
+      // There should be different txn IDs associated with each lock.
+      Set<Long> txnSet = new HashSet<Long>();
+      for (ShowLocksResponseElement lockResponseElem : slr.getLocks()) {
+        txnSet.add(lockResponseElem.getTxnid());
+      }
+      assertEquals(2, txnSet.size());
+
+      List<String> rows = runStatementOnDriver("show transactions");
+      // Header row + 2 transactions = 3 rows
+      assertEquals(3, rows.size());
+    } finally {
+      // Close the session which should free up the TxnHandler/locks held by 
the session.
+      // Done in the finally block to make sure we free up the locks; otherwise
+      // the cleanup in tearDown() will get stuck waiting on the lock held 
here on ACIDTBL.
+      restartSessionAndDriver(hiveConf);
+    }
+
+    // Lock should be freed up now.
+    ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
+    assertEquals(0, slr.getLocksSize());
+
+    List<String> rows = runStatementOnDriver("show transactions");
+    // Transactions should be committed.
+    // No transactions - just the header row
+    assertEquals(1, rows.size());
+  }
+
+  private void restartSessionAndDriver(HiveConf conf) throws Exception {
+    SessionState ss = SessionState.get();
+    if (ss != null) {
+      ss.close();
+    }
+    if (d != null) {
+      d.destroy();
+      d.close();
+    }
+
+    SessionState.start(conf);
+    d = new Driver(conf);
+  }
+
   // Ideally test like this should be a qfile test. However, the explain 
output from qfile is always
   // slightly different depending on where the test is run, specifically due 
to file size estimation
   private void testJoin(String engine, String joinType) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index ec228b4..f01edf8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -175,6 +175,19 @@ public class Driver implements CommandProcessor {
   private QueryLifeTimeHookRunner queryLifeTimeHookRunner;
   private final HooksLoader hooksLoader;
 
+  // Transaction manager the Driver has been initialized with (can be null).
+  // If this is set then this Transaction manager will be used during query
+  // compilation/execution rather than using the current session's transaction 
manager.
+  // This might be needed in a situation where a Driver is nested within an 
already
+  // running Driver/query - the nested Driver requires a separate transaction 
manager
+  // so as not to conflict with the outer Driver/query which is using the 
session
+  // transaction manager.
+  private final HiveTxnManager initTxnMgr;
+
+  // Transaction manager used for the query. This will be set at compile time 
based on
+  // either initTxnMgr or from the SessionState, in that order.
+  private HiveTxnManager queryTxnMgr;
+
   public enum DriverState {
     INITIALIZED,
     COMPILING,
@@ -352,8 +365,12 @@ public class Driver implements CommandProcessor {
     this(getNewQueryState(conf), null);
   }
 
+  public Driver(HiveConf conf, HiveTxnManager txnMgr) {
+    this(getNewQueryState(conf), null, null, txnMgr);
+  }
+
   public Driver(HiveConf conf, Context ctx) {
-    this(getNewQueryState(conf), null);
+    this(getNewQueryState(conf), null, null);
     this.ctx = ctx;
   }
 
@@ -362,18 +379,22 @@ public class Driver implements CommandProcessor {
   }
 
   public Driver(QueryState queryState, String userName) {
-    this(queryState, userName, new HooksLoader(queryState.getConf()), null);
+    this(queryState, userName, new HooksLoader(queryState.getConf()), null, 
null);
   }
 
   public Driver(HiveConf conf, HooksLoader hooksLoader) {
-    this(getNewQueryState(conf), null, hooksLoader, null);
+    this(getNewQueryState(conf), null, hooksLoader, null, null);
   }
 
   public Driver(QueryState queryState, String userName, QueryInfo queryInfo) {
-     this(queryState, userName, new HooksLoader(queryState.getConf()), 
queryInfo);
+     this(queryState, userName, new HooksLoader(queryState.getConf()), 
queryInfo, null);
+  }
+
+  public Driver(QueryState queryState, String userName, QueryInfo queryInfo, 
HiveTxnManager txnMgr) {
+    this(queryState, userName, new HooksLoader(queryState.getConf()), 
queryInfo, txnMgr);
   }
 
-  public Driver(QueryState queryState, String userName, HooksLoader 
hooksLoader, QueryInfo queryInfo) {
+  public Driver(QueryState queryState, String userName, HooksLoader 
hooksLoader, QueryInfo queryInfo, HiveTxnManager txnMgr) {
     this.queryState = queryState;
     this.conf = queryState.getConf();
     isParallelEnabled = (conf != null)
@@ -382,6 +403,7 @@ public class Driver implements CommandProcessor {
     this.hooksLoader = hooksLoader;
     this.queryLifeTimeHookRunner = new QueryLifeTimeHookRunner(conf, 
hooksLoader, console);
     this.queryInfo = queryInfo;
+    this.initTxnMgr = txnMgr;
   }
 
   /**
@@ -477,16 +499,22 @@ public class Driver implements CommandProcessor {
     try {
 
       // Initialize the transaction manager.  This must be done before analyze 
is called.
-      final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);
-      // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to 
release locks
+      if (initTxnMgr != null) {
+        queryTxnMgr = initTxnMgr;
+      } else {
+        queryTxnMgr = SessionState.get().initTxnMgr(conf);
+      }
+      queryState.setTxnManager(queryTxnMgr);
 
+      // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to 
release locks
       // if compile is being called multiple times, clear the old shutdownhook
       ShutdownHookManager.removeShutdownHook(shutdownRunner);
+      final HiveTxnManager txnMgr = queryTxnMgr;
       shutdownRunner = new Runnable() {
         @Override
         public void run() {
           try {
-            releaseLocksAndCommitOrRollback(false, txnManager);
+            releaseLocksAndCommitOrRollback(false, txnMgr);
           } catch (LockException e) {
             LOG.warn("Exception when releasing locks in ShutdownHook for 
Driver: " +
                 e.getMessage());
@@ -535,13 +563,13 @@ public class Driver implements CommandProcessor {
       // because at that point we need access to the objects.
       Hive.get().getMSC().flushCache();
 
-      if(checkConcurrency() && startImplicitTxn(txnManager)) {
+      if(checkConcurrency() && startImplicitTxn(queryTxnMgr)) {
         String userFromUGI = getUserFromUGI();
-        if (!txnManager.isTxnOpen()) {
+        if (!queryTxnMgr.isTxnOpen()) {
           if(userFromUGI == null) {
             return 10;
           }
-          long txnid = txnManager.openTxn(ctx, userFromUGI);
+          long txnid = queryTxnMgr.openTxn(ctx, userFromUGI);
         }
       }
       // Do semantic analysis and plan generation
@@ -1133,13 +1161,12 @@ public class Driver implements CommandProcessor {
 
   // Write the current set of valid transactions into the conf file so that it 
can be read by
   // the input format.
-  private void recordValidTxns() throws LockException {
+  private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
     ValidTxnList oldList = null;
     String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
     if(s != null && s.length() > 0) {
       oldList = new ValidReadTxnList(s);
     }
-    HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
     ValidTxnList txns = txnMgr.getValidTxns();
     if(oldList != null) {
       throw new IllegalStateException("calling recordValidTxn() more than once 
in the same " +
@@ -1172,6 +1199,7 @@ public class Driver implements CommandProcessor {
     }
     return null;
   }
+
   /**
    * Acquire read and write locks needed by the statement. The list of objects 
to be locked are
    * obtained from the inputs and outputs populated by the compiler.  Locking 
strategy depends on
@@ -1184,9 +1212,7 @@ public class Driver implements CommandProcessor {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
 
-    SessionState ss = SessionState.get();
-    HiveTxnManager txnMgr = ss.getTxnMgr();
-    if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
+    if(!queryTxnMgr.isTxnOpen() && queryTxnMgr.supportsAcid()) {
       /*non acid txn managers don't support txns but fwd lock requests to lock 
managers
         acid txn manager requires all locks to be associated with a txn so if 
we
         end up here w/o an open txn it's because we are processing something 
like "use <database>
@@ -1205,17 +1231,17 @@ public class Driver implements CommandProcessor {
         //so this makes (file name -> data) mapping stable
         acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> 
fsd1.getDirName().compareTo(fsd2.getDirName()));
         for (FileSinkDesc desc : acidSinks) {
-          desc.setTransactionId(txnMgr.getCurrentTxnId());
+          desc.setTransactionId(queryTxnMgr.getCurrentTxnId());
           //it's possible to have > 1 FileSink writing to the same 
table/partition
           //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
-          desc.setStatementId(txnMgr.getWriteIdAndIncrement());
+          desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement());
         }
       }
       /*It's imperative that {@code acquireLocks()} is called for all commands 
so that 
       HiveTxnManager can transition its state machine correctly*/
-      txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
-      if(txnMgr.recordSnapshot(plan)) {
-        recordValidTxns();
+      queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
+      if(queryTxnMgr.recordSnapshot(plan)) {
+        recordValidTxns(queryTxnMgr);
       }
       return 0;
     } catch (Exception e) {
@@ -1233,6 +1259,11 @@ public class Driver implements CommandProcessor {
   private boolean haveAcidWrite() {
     return !plan.getAcidSinks().isEmpty();
   }
+
+  public void releaseLocksAndCommitOrRollback(boolean commit) throws 
LockException {
+    releaseLocksAndCommitOrRollback(commit, queryTxnMgr);
+  }
+
   /**
    * @param commit if there is an open transaction and if true, commit,
    *               if false rollback.  If there is no open transaction this 
parameter is ignored.
@@ -1246,8 +1277,8 @@ public class Driver implements CommandProcessor {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
     HiveTxnManager txnMgr;
     if (txnManager == null) {
-      SessionState ss = SessionState.get();
-      txnMgr = ss.getTxnMgr();
+      // Default to driver's txn manager if no txn manager specified
+      txnMgr = queryTxnMgr;
     } else {
       txnMgr = txnManager;
     }
@@ -1366,6 +1397,23 @@ public class Driver implements CommandProcessor {
     return createProcessorResponse(compileInternal(command, false));
   }
 
+  public CommandProcessorResponse lockAndRespond() {
+    // Assumes the query has already been compiled
+    if (plan == null) {
+      throw new IllegalStateException(
+          "No previously compiled query for driver - queryId=" + 
queryState.getQueryId());
+    }
+
+    int ret = 0;
+    if (requiresLock()) {
+      ret = acquireLocks();
+    }
+    if (ret != 0) {
+      return rollback(createProcessorResponse(ret));
+    }
+    return createProcessorResponse(ret);
+  }
+
   private static final ReentrantLock globalCompileLock = new ReentrantLock();
   private int compileInternal(String command, boolean deferClose) {
     int ret;
@@ -1396,7 +1444,7 @@ public class Driver implements CommandProcessor {
 
     if (ret != 0) {
       try {
-        releaseLocksAndCommitOrRollback(false, null);
+        releaseLocksAndCommitOrRollback(false);
       } catch (LockException e) {
         LOG.warn("Exception in releasing locks. "
             + org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -1536,8 +1584,7 @@ public class Driver implements CommandProcessor {
       // the reason that we set the txn manager for the cxt here is because 
each
       // query has its own ctx object. The txn mgr is shared across the
       // same instance of Driver, which can run multiple queries.
-      HiveTxnManager txnManager = SessionState.get().getTxnMgr();
-      ctx.setHiveTxnManager(txnManager);
+      ctx.setHiveTxnManager(queryTxnMgr);
 
       if (requiresLock()) {
         // a checkpoint to see if the thread is interrupted or not before an 
expensive operation
@@ -1559,11 +1606,11 @@ public class Driver implements CommandProcessor {
       //if needRequireLock is false, the release here will do nothing because 
there is no lock
       try {
         //since set autocommit starts an implicit txn, close it
-        if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == 
HiveOperation.COMMIT) {
-          releaseLocksAndCommitOrRollback(true, null);
+        if(queryTxnMgr.isImplicitTransactionOpen() || plan.getOperation() == 
HiveOperation.COMMIT) {
+          releaseLocksAndCommitOrRollback(true);
         }
         else if(plan.getOperation() == HiveOperation.ROLLBACK) {
-          releaseLocksAndCommitOrRollback(false, null);
+          releaseLocksAndCommitOrRollback(false);
         }
         else {
           //txn (if there is one started) is not finished
@@ -1614,7 +1661,7 @@ public class Driver implements CommandProcessor {
   private CommandProcessorResponse rollback(CommandProcessorResponse cpr) {
     //console.printError(cpr.toString());
     try {
-      releaseLocksAndCommitOrRollback(false, null);
+      releaseLocksAndCommitOrRollback(false);
     }
     catch (LockException e) {
       LOG.error("rollback() FAILED: " + cpr);//make sure not to loose
@@ -2373,7 +2420,7 @@ public class Driver implements CommandProcessor {
     if(destroyed) {
       if (!hiveLocks.isEmpty()) {
         try {
-          releaseLocksAndCommitOrRollback(false, null);
+          releaseLocksAndCommitOrRollback(false);
         } catch (LockException e) {
           LOG.warn("Exception when releasing locking in destroy: " +
               e.getMessage());
@@ -2428,7 +2475,7 @@ public class Driver implements CommandProcessor {
     }
     if (!hiveLocks.isEmpty()) {
       try {
-        releaseLocksAndCommitOrRollback(false, null);
+        releaseLocksAndCommitOrRollback(false);
       } catch (LockException e) {
         LOG.warn("Exception when releasing locking in destroy: " +
             e.getMessage());

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index fa7c323..7d5aa8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql;
 import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 
 /**
@@ -39,6 +40,11 @@ public class QueryState {
   private HiveOperation commandType;
 
   /**
+   * transaction manager used in the query.
+   */
+  private HiveTxnManager txnManager;
+
+  /**
    * Private constructor, use QueryState.Builder instead
    * @param conf The query specific configuration object
    */
@@ -73,6 +79,14 @@ public class QueryState {
     return queryConf;
   }
 
+  public HiveTxnManager getTxnManager() {
+    return txnManager;
+  }
+
+  public void setTxnManager(HiveTxnManager txnManager) {
+    this.txnManager = txnManager;
+  }
+
   /**
    * Builder to instantiate the QueryState object.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 3ad30c4..06e00d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -123,6 +125,8 @@ public abstract class BaseSemanticAnalyzer {
   // whether any ACID table is involved in a query
   protected boolean acidInQuery;
 
+  protected HiveTxnManager txnManager;
+
   public static final int HIVE_COLUMN_ORDER_ASC = 1;
   public static final int HIVE_COLUMN_ORDER_DESC = 0;
   public static final int HIVE_COLUMN_NULLS_FIRST = 0;
@@ -160,7 +164,6 @@ public abstract class BaseSemanticAnalyzer {
     autoCommitValue = autoCommit;
   }
 
-
   public boolean skipAuthorization() {
     return false;
   }
@@ -230,6 +233,7 @@ public abstract class BaseSemanticAnalyzer {
       idToTableNameMap = new HashMap<String, String>();
       inputs = new LinkedHashSet<ReadEntity>();
       outputs = new LinkedHashSet<WriteEntity>();
+      txnManager = queryState.getTxnManager();
     } catch (Exception e) {
       throw new SemanticException(e);
     }
@@ -1918,4 +1922,11 @@ public abstract class BaseSemanticAnalyzer {
     fetch.setSerializationNullFormat(" ");
     return (FetchTask) TaskFactory.get(fetch, conf);
   }
+
+  protected HiveTxnManager getTxnMgr() {
+    if (txnManager != null) {
+      return txnManager;
+    }
+    return SessionState.get().getTxnMgr();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 551aafc..68ddcf0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -13657,7 +13657,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, 
String dest) {
-    if (SessionState.get() == null || 
!SessionState.get().getTxnMgr().supportsAcid()) {
+    if (SessionState.get() == null || !getTxnMgr().supportsAcid()) {
       return AcidUtils.Operation.NOT_ACID;
     } else if (isAcidOutputFormat(of)) {
       return getAcidType(dest);
@@ -13676,7 +13676,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   // Make sure the proper transaction manager that supports ACID is being used
   protected void checkAcidTxnManager(Table table) throws SemanticException {
-    if (SessionState.get() != null && 
!SessionState.get().getTxnMgr().supportsAcid()) {
+    if (SessionState.get() != null && !getTxnMgr().supportsAcid()) {
       throw new SemanticException(ErrorMsg.TXNMGR_NOT_ACID, table.getDbName(), 
table.getTableName());
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 702be2e..b3193d1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -70,7 +70,7 @@ public class UpdateDeleteSemanticAnalyzer extends 
SemanticAnalyzer {
       super.analyzeInternal(tree);
     } else {
 
-      if (!SessionState.get().getTxnMgr().supportsAcid()) {
+      if (!getTxnMgr().supportsAcid()) {
         throw new 
SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg());
       }
       switch (tree.getToken().getType()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index cceeec0..6dece05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.session;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -290,6 +291,8 @@ public class SessionState {
 
   private String atsDomainId;
 
+  private List<Closeable> cleanupItems = new LinkedList<Closeable>();
+
   /**
    * Get the lineage state stored in this session.
    *
@@ -1651,6 +1654,14 @@ public class SessionState {
   }
 
   public void close() throws IOException {
+    for (Closeable cleanupItem : cleanupItems) {
+      try {
+        cleanupItem.close();
+      } catch (Exception err) {
+        LOG.error("Error processing SessionState cleanup item " + 
cleanupItem.toString(), err);
+      }
+    }
+
     registry.clear();
     if (txnMgr != null) txnMgr.closeTxnManager();
     JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader);
@@ -1927,6 +1938,10 @@ public class SessionState {
   public KillQuery getKillQuery() {
     return killQuery;
   }
+
+  public void addCleanupItem(Closeable item) {
+    cleanupItems.add(item);
+  }
 }
 
 class ResourceMaps {

http://git-wip-us.apache.org/repos/asf/hive/blob/76d6e8a0/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 5003f42..610f36f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.udf.generic;
 
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
@@ -69,6 +70,8 @@ import 
org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -184,12 +187,21 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     String query = stringOI.getPrimitiveJavaObject(arguments[0]);
     int num = intOI.get(arguments[1]);
 
-    PlanFragment fragment = createPlanFragment(query, num);
+    // Generate applicationId for the LLAP splits
+    LlapCoordinator coordinator = LlapCoordinator.getInstance();
+    if (coordinator == null) {
+      throw new HiveException("LLAP coordinator is not initialized; must be 
running in HS2 with "
+          + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
+    }
+    ApplicationId applicationId = coordinator.createExtClientAppId();
+    LOG.info("Generated appID {} for LLAP splits", applicationId.toString());
+
+    PlanFragment fragment = createPlanFragment(query, num, applicationId);
     TezWork tezWork = fragment.work;
     Schema schema = fragment.schema;
 
     try {
-      for (InputSplit s : getSplits(jc, num, tezWork, schema)) {
+      for (InputSplit s : getSplits(jc, num, tezWork, schema, applicationId)) {
         Object[] os = new Object[1];
         bos.reset();
         s.write(dos);
@@ -202,7 +214,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
-  public PlanFragment createPlanFragment(String query, int num)
+  public PlanFragment createPlanFragment(String query, int num, ApplicationId 
splitsAppId)
       throws HiveException {
 
     HiveConf conf = new HiveConf(SessionState.get().getConf());
@@ -224,7 +236,17 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       throw new HiveException(e);
     }
 
-    Driver driver = new Driver(conf);
+    // Instantiate Driver to compile the query passed in.
+    // This UDF is running as part of an existing query, which may already be 
using the
+    // SessionState TxnManager. If this new Driver also tries to use the same 
TxnManager
+    // then this may mess up the existing state of the TxnManager.
+    // So initialize the new Driver with a new TxnManager so that it does not 
use the
+    // Session TxnManager that is already in use.
+    HiveTxnManager txnManager = 
TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    Driver driver = new Driver(conf, txnManager);
+    driver.init();
+    DriverCleanup driverCleanup = new DriverCleanup(driver, txnManager, 
splitsAppId.toString());
+    boolean needsCleanup = true;
     try {
       CommandProcessorResponse cpr = driver.compileAndRespond(query);
       if (cpr.getResponseCode() != 0) {
@@ -276,16 +298,38 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         }
 
         tezWork = ((TezTask)roots.get(0)).getWork();
+      } else {
+        // Table will be queried directly by LLAP
+        // Acquire locks if necessary - they will be released during session 
cleanup.
+        // The read will have READ_COMMITTED level semantics.
+        cpr = driver.lockAndRespond();
+        if (cpr.getResponseCode() != 0) {
+          throw new HiveException("Failed to acquire locks: " + 
cpr.getException());
+        }
+
+        // Attach the resources to the session cleanup.
+        SessionState.get().addCleanupItem(driverCleanup);
+        needsCleanup = false;
       }
 
       return new PlanFragment(tezWork, schema, jc);
     } finally {
-      driver.close();
-      driver.destroy();
+      if (needsCleanup) {
+        if (driverCleanup != null) {
+          try {
+            driverCleanup.close();
+          } catch (IOException err) {
+            throw new HiveException(err);
+          }
+        } else if (driver != null) {
+          driver.close();
+          driver.destroy();
+        }
+      }
     }
   }
 
-  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, 
Schema schema)
+  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, 
Schema schema, ApplicationId applicationId)
     throws IOException {
 
     DAG dag = DAG.create(work.getName());
@@ -311,7 +355,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
       // Update the queryId to use the generated applicationId. See comment 
below about
       // why this is done.
-      ApplicationId applicationId = coordinator.createExtClientAppId();
       HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, 
applicationId.toString());
       Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
           new ArrayList<LocalResource>(), fs, ctx, false, work,
@@ -412,6 +455,37 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
+  private static class DriverCleanup implements Closeable {
+    private final Driver driver;
+    private final HiveTxnManager txnManager;
+    private final String applicationId;
+
+    public DriverCleanup(Driver driver, HiveTxnManager txnManager, String 
applicationId) {
+      this.driver = driver;
+      this.txnManager = txnManager;
+      this.applicationId = applicationId;
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        LOG.info("DriverCleanup for LLAP splits: {}", applicationId);
+        driver.releaseLocksAndCommitOrRollback(true);
+        driver.close();
+        driver.destroy();
+        txnManager.closeTxnManager();
+      } catch (Exception err) {
+        LOG.error("Error closing driver resources", err);
+        throw new IOException(err);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "DriverCleanup for LLAP splits: " + applicationId;
+    }
+  }
+
   private static class JobTokenCreator {
     private static Token<JobTokenIdentifier> createJobToken(ApplicationId 
applicationId) {
       String tokenIdentifier = applicationId.toString();

Reply via email to