[ 
https://issues.apache.org/jira/browse/HIVE-20627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sankar Hariappan updated HIVE-20627:
------------------------------------
    Summary: Concurrent async queries intermittently fails with LockException 
and cause memory leak.  (was: Concurrent Async queries intermittently fails 
with LockException and cause memory leak.)

> Concurrent async queries intermittently fails with LockException and cause 
> memory leak.
> ---------------------------------------------------------------------------------------
>
>                 Key: HIVE-20627
>                 URL: https://issues.apache.org/jira/browse/HIVE-20627
>             Project: Hive
>          Issue Type: Bug
>          Components: HiveServer2
>    Affects Versions: 4.0.0, 3.2.0
>            Reporter: Sankar Hariappan
>            Assignee: Sankar Hariappan
>            Priority: Major
>
> When multiple async queries are executed from same session, it leads to 
> multiple async query execution DAGs share the same Hive object which is set 
> by caller for all threads. In case of loading dynamic partitions, it creates 
> MoveTask which re-creates the Hive object and closes the shared Hive object 
> which causes metastore connection issues for other async execution thread who 
> still access it. This is also seen if ReplDumpTask and ReplLoadTask are part 
> of the DAG.
> *Root cause:*
> For Async query execution from SQLOperation.runInternal, we set the Thread 
> local Hive object for all the child threads as parentHive 
> (parentSession.getSessionHive())
> {code}
> @Override
>  public void run() {
>  PrivilegedExceptionAction<Object> doAsAction = new 
> PrivilegedExceptionAction<Object>() {
>  @Override
>  public Object run() throws HiveSQLException {
>  Hive.set(parentHive); // Setting parentHive for all async operations.
>  // TODO: can this result in cross-thread reuse of session state?
>  SessionState.setCurrentSessionState(parentSessionState);
>  PerfLogger.setPerfLogger(parentPerfLogger);
>  LogUtils.registerLoggingContext(queryState.getConf());
>  try {
>  if (asyncPrepare) {
>  prepare(queryState);
>  }
>  runQuery();
>  } catch (HiveSQLException e) {
>  // TODO: why do we invent our own error path op top of the one from 
> Future.get?
>  setOperationException(e);
>  LOG.error("Error running hive query: ", e);
>  } finally {
>  LogUtils.unregisterLoggingContext();
>  }
>  return null;
>  }
>  };
> {code}
> Now, when async execution in progress and if one of the thread re-creates the 
> Hive object, it closes the parentHive object first which impacts other 
> threads using it and hence conf object it refers too gets cleaned up and 
> hence we get null for VALID_TXNS_KEY value. 
> {code}
> private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean 
> doRegisterAllFns)
>  throws HiveException {
>  if (db != null) {
>  LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh 
> +
>  ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
>  db.close();
>  }
>  closeCurrent();
>  if (c == null) {
>  c = createHiveConf();
>  }
>  c.set("fs.scheme.class", "dfs");
>  Hive newdb = new Hive(c, doRegisterAllFns);
>  hiveDB.set(newdb);
>  return newdb;
>  }
> {code}
> *Fix:*
> We shouldn't clean the old Hive object if it is shared by multiple threads. 
> Shall use a flag to know this.
> *Memory leak issue:*
> Memory leak is found if one of the threads from Hive.loadDynamicPartitions 
> throw exception. rawStoreMap is used to store rawStore objects which has to 
> be cleaned. In this case, it is populated only in success flow but if there 
> are exceptions, it is not and hence there is a leak. 
> {code}
> futures.add(pool.submit(new Callable<Void>() {
>  @Override
>  public Void call() throws Exception {
>  try {
>  // move file would require session details (needCopy() invokes 
> SessionState.get)
>  SessionState.setCurrentSessionState(parentSession);
>  LOG.info("New loading path = " + partPath + " with partSpec " + 
> fullPartSpec);
> // load the partition
>  Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, 
> loadFileType,
>  true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId, 
> stmtId,
>  isInsertOverwrite);
>  partitionsMap.put(fullPartSpec, newPartition);
> if (inPlaceEligible) {
>  synchronized (ps) {
>  InPlaceUpdate.rePositionCursor(ps);
>  partitionsLoaded.incrementAndGet();
>  InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
>  + partsToLoad + " partitions.");
>  }
>  }
>  // Add embedded rawstore, so we can cleanup later to avoid memory leak
>  if (getMSC().isLocalMetaStore()) {
>  if (!rawStoreMap.containsKey(Thread.currentThread().getId())) {
>  rawStoreMap.put(Thread.currentThread().getId(), 
> HiveMetaStore.HMSHandler.getRawStore());
>  }
>  }
>  return null;
>  } catch (Exception t) {
>  }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to