Repository: hive Updated Branches: refs/heads/branch-3 81a4bdd75 -> c085aaa58
Revert "HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)" This reverts commit 86aeb53645d12a78a4ddcbe0df2205115e6bf4f4. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c085aaa5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c085aaa5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c085aaa5 Branch: refs/heads/branch-3 Commit: c085aaa58b565873d40e547936aae6618c89ba91 Parents: 81a4bdd Author: Jesus Camacho Rodriguez <[email protected]> Authored: Mon Jun 11 17:15:10 2018 -0700 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Mon Jun 11 17:15:10 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Context.java | 43 ---- .../java/org/apache/hadoop/hive/ql/Driver.java | 201 +++---------------- .../apache/hadoop/hive/ql/exec/FetchTask.java | 1 - 3 files changed, 23 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9eda4ed..1921ea7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -291,7 +291,6 @@ public class Context { public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) { return insertBranchToNamePrefix.put(pos, prefix); } - public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -316,48 +315,6 @@ public class Context { viewsTokenRewriteStreams = new HashMap<>(); } - protected Context(Context ctx) { - // This method creates a deep copy of context, but the copy is partial, - // hence it needs to be used carefully. In particular, following objects - // are ignored: - // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams, - // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix - this.isHDFSCleanup = ctx.isHDFSCleanup; - this.resFile = ctx.resFile; - this.resDir = ctx.resDir; - this.resFs = ctx.resFs; - this.resDirPaths = ctx.resDirPaths; - this.resDirFilesNum = ctx.resDirFilesNum; - this.initialized = ctx.initialized; - this.originalTracker = ctx.originalTracker; - this.nonLocalScratchPath = ctx.nonLocalScratchPath; - this.localScratchDir = ctx.localScratchDir; - this.scratchDirPermission = ctx.scratchDirPermission; - this.fsScratchDirs.putAll(ctx.fsScratchDirs); - this.conf = ctx.conf; - this.pathid = ctx.pathid; - this.explainConfig = ctx.explainConfig; - this.cmd = ctx.cmd; - this.executionId = ctx.executionId; - this.hiveLocks = ctx.hiveLocks; - this.hiveTxnManager = ctx.hiveTxnManager; - this.needLockMgr = ctx.needLockMgr; - this.sequencer = ctx.sequencer; - this.outputLockObjects.putAll(ctx.outputLockObjects); - this.stagingDir = ctx.stagingDir; - this.heartbeater = ctx.heartbeater; - this.skipTableMasking = ctx.skipTableMasking; - this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge; - this.isLoadingMaterializedView = ctx.isLoadingMaterializedView; - this.operation = ctx.operation; - this.wmContext = ctx.wmContext; - this.isExplainPlan = ctx.isExplainPlan; - this.statsSource = ctx.statsSource; - this.executionIndex = ctx.executionIndex; - this.viewsTokenRewriteStreams = new HashMap<>(); - this.rewrittenStatementContexts = new HashSet<>(); - } - public Map<String, Path> getFsScratchDirs() { return fsScratchDirs; } http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 8b5262a..08f9a67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,16 +39,15 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -61,11 +60,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.ColumnType; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.cache.results.CacheUsage; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry; @@ -93,7 +92,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; -import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -143,7 +141,6 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hive.common.util.ShutdownHookManager; -import org.apache.hive.common.util.TxnIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,9 +215,6 @@ public class Driver implements IDriver { private CacheEntry usedCacheEntry; private ValidWriteIdList compactionWriteIds = null; - private Context backupContext = null; - private boolean retrial = false; - private enum DriverState { INITIALIZED, COMPILING, @@ -631,11 +625,10 @@ public class Driver implements IDriver { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - backupContext = new Context(ctx); - boolean executeHooks = hookRunner.hasPreAnalyzeHooks(); - - HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); - if (executeHooks) { + BaseSemanticAnalyzer sem; + // Do semantic analysis and plan generation + if (hookRunner.hasPreAnalyzeHooks()) { + HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); hookCtx.setConf(conf); hookCtx.setUserName(userName); hookCtx.setIpAddress(SessionState.get().getUserIpAddress()); @@ -643,24 +636,24 @@ public class Driver implements IDriver { hookCtx.setHiveOperation(queryState.getHiveOperation()); tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); - } - - // Do semantic analysis and plan generation - BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); - - if (!retrial) { + sem = SemanticAnalyzerFactory.get(queryState, tree); openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control generateValidTxnList(); - } - - sem.analyze(tree, ctx); - - if (executeHooks) { + sem.analyze(tree, ctx); hookCtx.update(sem); + hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); + } else { + sem = SemanticAnalyzerFactory.get(queryState, tree); + openTransaction(); + // TODO: Lock acquisition should be moved before this method call + // when we want to implement lock-based concurrency control + generateValidTxnList(); + sem.analyze(tree, ctx); } - - LOG.info("Semantic Analysis Completed (retrial = {})", retrial); + LOG.info("Semantic Analysis Completed"); // Retrieve information about cache usage for the query. if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) { @@ -678,6 +671,7 @@ public class Driver implements IDriver { plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema); + conf.set("mapreduce.workflow.id", "hive_" + queryId); conf.set("mapreduce.workflow.name", queryStr); @@ -780,86 +774,6 @@ public class Driver implements IDriver { } } - // Checks whether txn list has been invalidated while planning the query. - // This would happen if query requires exclusive/semi-shared lock, and there - // has been a committed transaction on the table over which the lock is - // required. - private boolean isValidTxnListState() throws LockException { - // 1) Get valid txn list. - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - if (txnString == null) { - // Not a transactional op, nothing more to do - return true; - } - ValidTxnList currentTxnList = queryTxnMgr.getValidTxns(); - String currentTxnString = currentTxnList.toString(); - if (currentTxnString.equals(txnString)) { - // Still valid, nothing more to do - return true; - } - // 2) Get locks that are relevant: - // - Exclusive for INSERT OVERWRITE. - // - Semi-shared for UPDATE/DELETE. - if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) { - // Nothing to check - return true; - } - Set<String> nonSharedLocks = new HashSet<>(); - for (HiveLock lock : ctx.getHiveLocks()) { - if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE || - lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) { - if (lock.getHiveLockObject().getPaths().length == 2) { - // Pos 0 of lock paths array contains dbname, pos 1 contains tblname - nonSharedLocks.add( - Warehouse.getQualifiedName( - lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1])); - } - } - } - // 3) Get txn tables that are being written - ValidTxnWriteIdList txnWriteIdList = - new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); - if (txnWriteIdList == null) { - // Nothing to check - return true; - } - List<Pair<String, Table>> writtenTables = getWrittenTableList(plan); - ValidTxnWriteIdList currentTxnWriteIds = - queryTxnMgr.getValidWriteIds( - writtenTables.stream() - .filter(e -> AcidUtils.isTransactionalTable(e.getRight())) - .map(e -> e.getLeft()) - .collect(Collectors.toList()), - currentTxnString); - for (Pair<String, Table> tableInfo : writtenTables) { - String fullQNameForLock = Warehouse.getQualifiedName( - tableInfo.getRight().getDbName(), - MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName())); - if (nonSharedLocks.contains(fullQNameForLock)) { - // Check if table is transactional - if (AcidUtils.isTransactionalTable(tableInfo.getRight())) { - // Check that write id is still valid - if (!TxnIdUtils.checkEquivalentWriteIds( - txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()), - currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) { - // Write id has changed, it is not valid anymore, - // we need to recompile - return false; - } - } - nonSharedLocks.remove(fullQNameForLock); - } - } - if (!nonSharedLocks.isEmpty()) { - throw new LockException("Wrong state: non-shared locks contain information for tables that have not" + - " been visited when trying to validate the locks from query tables.\n" + - "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" + - "Remaining locks after check: " + nonSharedLocks); - } - // It passes the test, it is valid - return true; - } - private void setTriggerContext(final String queryId) { final long queryStartTime; // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not @@ -1478,34 +1392,6 @@ public class Driver implements IDriver { tableList.add(fullTableName); } - // Make the list of transactional tables list which are getting written by current txn - private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) { - List<Pair<String, Table>> result = new ArrayList<>(); - Set<String> tableList = new HashSet<>(); - for (WriteEntity output : plan.getOutputs()) { - Table tbl; - switch (output.getType()) { - case TABLE: { - tbl = output.getTable(); - break; - } - case PARTITION: - case DUMMYPARTITION: { - tbl = output.getPartition().getTable(); - break; - } - default: { - continue; - } - } - String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); - if (tableList.add(fullTableName)) { - result.add(new ImmutablePair(fullTableName, tbl)); - } - } - return result; - } - private String getUserFromUGI() { // Don't use the userName member, as it may or may not have been set. Get the value from // conf, which calls into getUGI to figure out who the process is running as. @@ -1576,6 +1462,7 @@ public class Driver implements IDriver { acidDdlDesc.setWriteId(writeId); } + /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); @@ -1936,48 +1823,6 @@ public class Driver implements IDriver { lockAndRespond(); try { - if (!isValidTxnListState()) { - // Snapshot was outdated when locks were acquired, hence regenerate context, - // txn list and retry - // TODO: Lock acquisition should be moved before analyze, this is a bit hackish. - // Currently, we acquire a snapshot, we compile the query wrt that snapshot, - // and then, we acquire locks. If snapshot is still valid, we continue as usual. - // But if snapshot is not valid, we recompile the query. - retrial = true; - backupContext.addRewrittenStatementContext(ctx); - backupContext.setHiveLocks(ctx.getHiveLocks()); - ctx = backupContext; - conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString()); - if (plan.hasAcidResourcesInQuery()) { - recordValidWriteIds(queryTxnMgr); - } - - if (!alreadyCompiled) { - // compile internal will automatically reset the perf logger - compileInternal(command, true); - } else { - // Since we're reusing the compiled plan, we need to update its start time for current run - plan.setQueryStartTime(queryDisplay.getQueryStartTime()); - } - - if (!isValidTxnListState()) { - // Throw exception - throw handleHiveException(new HiveException("Operation could not be executed"), 14); - } - - //Reset the PerfLogger - perfLogger = SessionState.getPerfLogger(true); - - // the reason that we set the txn manager for the cxt here is because each - // query has its own ctx object. The txn mgr is shared across the - // same instance of Driver, which can run multiple queries. - ctx.setHiveTxnManager(queryTxnMgr); - } - } catch (LockException e) { - throw handleHiveException(e, 13); - } - - try { execute(); } catch (CommandProcessorResponse cpr) { rollback(cpr); http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index caa9d83..e555aec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -59,7 +59,6 @@ public class FetchTask extends Task<FetchWork> implements Serializable { public void setValidWriteIdList(String writeIdStr) { fetch.setValidWriteIdList(writeIdStr); } - @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx, CompilationOpContext opContext) {
