Repository: hive Updated Branches: refs/heads/branch-1 70f352728 -> 917fc8727
HIVE-11550 ACID queries pollute HiveConf (Eugene Koifman, reviwed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/917fc872 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/917fc872 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/917fc872 Branch: refs/heads/branch-1 Commit: 917fc8727fb20d52ad8cc6baf5ebed535eeb743e Parents: 70f3527 Author: Eugene Koifman <[email protected]> Authored: Fri May 13 16:19:27 2016 -0700 Committer: Eugene Koifman <[email protected]> Committed: Fri May 13 16:19:27 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/optimizer/Optimizer.java | 2 +- .../correlation/AbstractCorrelationProcCtx.java | 21 +++++++++++++++++++- .../hadoop/hive/ql/parse/ParseContext.java | 17 ++++++++++++++-- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 ++-- .../hadoop/hive/ql/parse/TaskCompiler.java | 2 +- 5 files changed, 39 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/917fc872/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 035a537..e52d7f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -159,7 +159,7 @@ public class Optimizer { transformations.add(new JoinReorder()); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION) || pctx.hasAcidWrite()) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); http://git-wip-us.apache.org/repos/asf/hive/blob/917fc872/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 5b673df..bb4ec55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -28,8 +28,12 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCorrelationProcCtx.class); private ParseContext pctx; // For queries using script, the optimization cannot be applied without user's confirmation // If script preserves alias and value for columns related to keys, user can set this true @@ -45,7 +49,22 @@ abstract class AbstractCorrelationProcCtx implements NodeProcessorCtx { public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet<Operator<?>>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); - minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + if(pctx.hasAcidWrite()) { + StringBuilder tblNames = new StringBuilder(); + for(FileSinkDesc fsd : pctx.getAcidSinks()) { + if(fsd.getTable() != null) { + tblNames.append(fsd.getTable().getDbName()).append('.').append(fsd.getTable().getTableName()).append(','); + } + } + if(tblNames.length() > 0) { + tblNames.setLength(tblNames.length() - 1);//traling ',' + } + LOG.info("Overriding " + HIVEOPTREDUCEDEDUPLICATIONMINREDUCER + " to 1 due to a write to transactional table(s) " + tblNames); + minReducer = 1; + } + else { + minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + } isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } http://git-wip-us.apache.org/repos/asf/hive/blob/917fc872/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index c77a642..e451a30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -107,7 +109,7 @@ public class ParseContext { private AnalyzeRewriteContext analyzeRewrite; private CreateTableDesc createTableDesc; private boolean reduceSinkAddedBySortedDynPartition; - + private Set<FileSinkDesc> acidFileSinks = Collections.emptySet(); public ParseContext() { } @@ -166,7 +168,8 @@ public class ParseContext { Map<String, ReadEntity> viewAliasToInput, List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting, AnalyzeRewriteContext analyzeRewrite, CreateTableDesc createTableDesc, - QueryProperties queryProperties) { + QueryProperties queryProperties, + Set<FileSinkDesc> acidFileSinks) { this.conf = conf; this.opToPartPruner = opToPartPruner; this.opToPartList = opToPartList; @@ -193,6 +196,16 @@ public class ParseContext { this.analyzeRewrite = analyzeRewrite; this.createTableDesc = createTableDesc; this.queryProperties = queryProperties; + if(acidFileSinks != null && !acidFileSinks.isEmpty()) { + this.acidFileSinks = new HashSet<>(); + this.acidFileSinks.addAll(acidFileSinks); + } + } + public Set<FileSinkDesc> getAcidSinks() { + return acidFileSinks; + } + public boolean hasAcidWrite() { + return !acidFileSinks.isEmpty(); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/917fc872/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 829de48..bcc42f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -414,7 +414,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { listMapJoinOpsNoReducer, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties); + analyzeRewrite, tableDesc, queryProperties, acidFileSinks); } @SuppressWarnings("nls") @@ -10208,7 +10208,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { listMapJoinOpsNoReducer, prunedPartitions, opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, - analyzeRewrite, tableDesc, queryProperties); + analyzeRewrite, tableDesc, queryProperties, acidFileSinks); // 5. Take care of view creation if (createVwDesc != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/917fc872/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index ba11e41..ce7aeb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -401,7 +401,7 @@ public abstract class TaskCompiler { pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks, pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(), pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), - pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), pCtx.getQueryProperties()); + pCtx.getAnalyzeRewrite(), pCtx.getCreateTable(), pCtx.getQueryProperties(), pCtx.getAcidSinks()); clone.setFetchTask(pCtx.getFetchTask()); clone.setLineageInfo(pCtx.getLineageInfo()); clone.setMapJoinOps(pCtx.getMapJoinOps());
