[
https://issues.apache.org/jira/browse/HIVE-20627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sankar Hariappan reassigned HIVE-20627:
---------------------------------------
> Concurrent Async queries from same session intermittently fails with
> LockException.
> -----------------------------------------------------------------------------------
>
> Key: HIVE-20627
> URL: https://issues.apache.org/jira/browse/HIVE-20627
> Project: Hive
> Issue Type: Bug
> Components: HiveServer2
> Affects Versions: 4.0.0, 3.2.0
> Reporter: Sankar Hariappan
> Assignee: Sankar Hariappan
> Priority: Major
>
> When multiple async queries are executed from same session, it leads to
> multiple async query execution DAGs share the same Hive object which is set
> by caller for all threads. In case of loading dynamic partitions, it creates
> MoveTask which re-creates the Hive object and closes the shared Hive object
> which causes metastore connection issues for other async execution thread who
> still access it. This is also seen if ReplDumpTask and ReplLoadTask are part
> of the DAG.
> *Root cause:*
> For Async query execution from SQLOperation.runInternal, we set the Thread
> local Hive object for all the child threads as parentHive
> (parentSession.getSessionHive())
> {code}
> @Override
> public void run() {
> PrivilegedExceptionAction<Object> doAsAction = new
> PrivilegedExceptionAction<Object>() {
> @Override
> public Object run() throws HiveSQLException {
> Hive.set(parentHive); // Setting parentHive for all async operations.
> // TODO: can this result in cross-thread reuse of session state?
> SessionState.setCurrentSessionState(parentSessionState);
> PerfLogger.setPerfLogger(parentPerfLogger);
> LogUtils.registerLoggingContext(queryState.getConf());
> try {
> if (asyncPrepare) {
> prepare(queryState);
> }
> runQuery();
> } catch (HiveSQLException e) {
> // TODO: why do we invent our own error path op top of the one from
> Future.get?
> setOperationException(e);
> LOG.error("Error running hive query: ", e);
> } finally {
> LogUtils.unregisterLoggingContext();
> }
> return null;
> }
> };
> {code}
> Now, when async execution in progress and if one of the thread re-creates the
> Hive object, it closes the parentHive object first which impacts other
> threads using it and hence conf object it refers too gets cleaned up and
> hence we get null for VALID_TXNS_KEY value.
> {code}
> private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean
> doRegisterAllFns)
> throws HiveException {
> if (db != null) {
> LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh
> +
> ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
> db.close();
> }
> closeCurrent();
> if (c == null) {
> c = createHiveConf();
> }
> c.set("fs.scheme.class", "dfs");
> Hive newdb = new Hive(c, doRegisterAllFns);
> hiveDB.set(newdb);
> return newdb;
> }
> {code}
> *Fix:*
> We shouldn't clean the old Hive object if it is shared by multiple threads.
> Shall use a flag to know this.
> *Memory leak issue:*
> Memory leak is found if one of the threads from Hive.loadDynamicPartitions
> throw exception. rawStoreMap is used to store rawStore objects which has to
> be cleaned. In this case, it is populated only in success flow but if there
> are exceptions, it is not and hence there is a leak.
> {code}
> futures.add(pool.submit(new Callable<Void>() {
> @Override
> public Void call() throws Exception {
> try {
> // move file would require session details (needCopy() invokes
> SessionState.get)
> SessionState.setCurrentSessionState(parentSession);
> LOG.info("New loading path = " + partPath + " with partSpec " +
> fullPartSpec);
> // load the partition
> Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
> loadFileType,
> true, false, numLB > 0, false, isAcid, hasFollowingStatsTask, writeId,
> stmtId,
> isInsertOverwrite);
> partitionsMap.put(fullPartSpec, newPartition);
> if (inPlaceEligible) {
> synchronized (ps) {
> InPlaceUpdate.rePositionCursor(ps);
> partitionsLoaded.incrementAndGet();
> InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
> + partsToLoad + " partitions.");
> }
> }
> // Add embedded rawstore, so we can cleanup later to avoid memory leak
> if (getMSC().isLocalMetaStore()) {
> if (!rawStoreMap.containsKey(Thread.currentThread().getId())) {
> rawStoreMap.put(Thread.currentThread().getId(),
> HiveMetaStore.HMSHandler.getRawStore());
> }
> }
> return null;
> } catch (Exception t) {
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)