Sankar Hariappan created HIVE-20627:
---------------------------------------

             Summary: Concurrent Async queries from same session intermittently 
fails with LockException.
                 Key: HIVE-20627
                 URL: https://issues.apache.org/jira/browse/HIVE-20627
             Project: Hive
          Issue Type: Bug
          Components: HiveServer2
    Affects Versions: 4.0.0, 3.2.0
            Reporter: Sankar Hariappan
            Assignee: Sankar Hariappan


When multiple async queries are executed from same session, it leads to 
multiple async query execution DAGs share the same Hive object which is set by 
caller for all threads. In case of loading dynamic partitions, it creates 
MoveTask which re-creates the Hive object and closes the shared Hive object 
which causes metastore connection issues for other async execution thread who 
still access it. This is also seen if ReplDumpTask and ReplLoadTask are part of 
the DAG.

*Root cause:*
For Async query execution from SQLOperation.runInternal, we set the Thread 
local Hive object for all the child threads as parentHive 
(parentSession.getSessionHive())
{code}
@Override
 public void run() {
 PrivilegedExceptionAction<Object> doAsAction = new 
PrivilegedExceptionAction<Object>() {
 @Override
 public Object run() throws HiveSQLException {
 Hive.set(parentHive); // Setting parentHive for all async operations.
 // TODO: can this result in cross-thread reuse of session state?
 SessionState.setCurrentSessionState(parentSessionState);
 PerfLogger.setPerfLogger(parentPerfLogger);
 LogUtils.registerLoggingContext(queryState.getConf());
 try {
 if (asyncPrepare) {
 prepare(queryState);
 }
 runQuery();
 } catch (HiveSQLException e) {
 // TODO: why do we invent our own error path op top of the one from Future.get?
 setOperationException(e);
 LOG.error("Error running hive query: ", e);
 } finally {
 LogUtils.unregisterLoggingContext();
 }
 return null;
 }
 };
{code}

Now, when async execution in progress and if one of the thread re-creates the 
Hive object, it closes the parentHive object first which impacts other threads 
using it and hence conf object it refers too gets cleaned up and hence we get 
null for VALID_TXNS_KEY value. 
{code}
private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean 
doRegisterAllFns)
 throws HiveException {
 if (db != null) {
 LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
 ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
 db.close();
 }
 closeCurrent();
 if (c == null) {
 c = createHiveConf();
 }
 c.set("fs.scheme.class", "dfs");
 Hive newdb = new Hive(c, doRegisterAllFns);
 hiveDB.set(newdb);
 return newdb;
 }
{code}

*Fix:*
We shouldn't clean the old Hive object if it is shared by multiple threads. 
Shall use a flag to know this.

*Memory leak issue:*
Memory leak is found if one of the threads from Hive.loadDynamicPartitions 
throw exception. rawStoreMap is used to store rawStore objects which has to be 
cleaned. In this case, it is populated only in success flow but if there are 
exceptions, it is not and hence there is a leak. 
{code}
futures.add(pool.submit(new Callable<Void>() {
 @Override
 public Void call() throws Exception {
 try {
 // move file would require session details (needCopy() invokes 
SessionState.get)
 SessionState.setCurrentSessionState(parentSession);
 LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);

// load the partition
 Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, 
loadFileType,
 true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, stmtId,
 isInsertOverwrite);
 partitionsMap.put(fullPartSpec, newPartition);

if (inPlaceEligible) {
 synchronized (ps) {
 InPlaceUpdate.rePositionCursor(ps);
 partitionsLoaded.incrementAndGet();
 InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
 + partsToLoad + " partitions.");
 }
 }
 // Add embedded rawstore, so we can cleanup later to avoid memory leak
 if (getMSC().isLocalMetaStore()) {
 if (!rawStoreMap.containsKey(Thread.currentThread().getId())) {
 rawStoreMap.put(Thread.currentThread().getId(), 
HiveMetaStore.HMSHandler.getRawStore());
 }
 }
 return null;
 } catch (Exception t) {
 }
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to