http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/gen/thrift/gen-rb/queryplan_constants.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb index 8080b3f..428185e 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_constants.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_constants.rb @@ -1,5 +1,5 @@ # -# Autogenerated by Thrift Compiler (0.9.0) +# Autogenerated by Thrift Compiler (0.9.2) # # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING #
http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/gen/thrift/gen-rb/queryplan_types.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index c2c4220..1a22f07 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -1,5 +1,5 @@ # -# Autogenerated by Thrift Compiler (0.9.0) +# Autogenerated by Thrift Compiler (0.9.2) # # DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING # @@ -46,8 +46,9 @@ module OperatorType ORCFILEMERGE = 22 RCFILEMERGE = 23 MERGEJOIN = 24 - VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN"} - VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN]).freeze + SPARKPRUNINGSINK = 25 + VALUE_MAP = {0 => "JOIN", 1 => "MAPJOIN", 2 => "EXTRACT", 3 => "FILTER", 4 => "FORWARD", 5 => "GROUPBY", 6 => "LIMIT", 7 => "SCRIPT", 8 => "SELECT", 9 => "TABLESCAN", 10 => "FILESINK", 11 => "REDUCESINK", 12 => "UNION", 13 => "UDTF", 14 => "LATERALVIEWJOIN", 15 => "LATERALVIEWFORWARD", 16 => "HASHTABLESINK", 17 => "HASHTABLEDUMMY", 18 => "PTF", 19 => "MUX", 20 => "DEMUX", 21 => "EVENT", 22 => "ORCFILEMERGE", 23 => "RCFILEMERGE", 24 => "MERGEJOIN", 25 => "SPARKPRUNINGSINK"} + VALID_VALUES = Set.new([JOIN, MAPJOIN, EXTRACT, FILTER, FORWARD, GROUPBY, LIMIT, SCRIPT, SELECT, TABLESCAN, FILESINK, REDUCESINK, UNION, UDTF, LATERALVIEWJOIN, LATERALVIEWFORWARD, HASHTABLESINK, HASHTABLEDUMMY, PTF, MUX, DEMUX, EVENT, ORCFILEMERGE, RCFILEMERGE, MERGEJOIN, SPARKPRUNINGSINK]).freeze end module TaskType http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index f58a10b..ff58741 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -34,8 +34,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; @@ -117,6 +120,8 @@ public final class OperatorFactory { AppMasterEventOperator.class)); opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, AppMasterEventOperator.class)); + opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class, + SparkPartitionPruningSinkOperator.class)); opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class, RCFileMergeOperator.class)); opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class, @@ -133,6 +138,9 @@ public final class OperatorFactory { VectorAppMasterEventOperator.class)); vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class)); + vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>( + SparkPartitionPruningSinkDesc.class, + VectorSparkPartitionPruningSinkOperator.class)); vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class)); vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class)); vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class)); http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 21398d8..007db75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -148,11 +149,11 @@ public class HiveSparkClientFactory { Set<String> classes = Sets.newHashSet( Splitter.on(",").trimResults().omitEmptyStrings().split( Strings.nullToEmpty(sparkConf.get("spark.kryo.classesToRegister")))); + classes.add(Writable.class.getName()); classes.add(VectorizedRowBatch.class.getName()); classes.add(BytesWritable.class.getName()); classes.add(HiveKey.class.getName()); - sparkConf.put( - "spark.kryo.classesToRegister", Joiner.on(",").join(classes)); + sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes)); return sparkConf; } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java new file mode 100644 index 0000000..52913e0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.clearspring.analytics.util.Preconditions; +import javolution.testing.AssertionException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * The spark version of DynamicPartitionPruner. + */ +public class SparkDynamicPartitionPruner { + private static final Log LOG = LogFactory.getLog(SparkDynamicPartitionPruner.class); + private final Map<String, List<SourceInfo>> sourceInfoMap = new LinkedHashMap<String, List<SourceInfo>>(); + private final BytesWritable writable = new BytesWritable(); + + public void prune(MapWork work, JobConf jobConf) throws HiveException, SerDeException { + sourceInfoMap.clear(); + initialize(work, jobConf); + if (sourceInfoMap.size() == 0) { + // Nothing to prune for this MapWork + return; + } + processFiles(work, jobConf); + prunePartitions(work); + } + + public void initialize(MapWork work, JobConf jobConf) throws SerDeException { + Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>(); + Set<String> sourceWorkIds = work.getEventSourceTableDescMap().keySet(); + + for (String id : sourceWorkIds) { + List<TableDesc> tables = work.getEventSourceTableDescMap().get(id); + List<String> columnNames = work.getEventSourceColumnNameMap().get(id); + List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(id); + + Iterator<String> cit = columnNames.iterator(); + Iterator<ExprNodeDesc> pit = partKeyExprs.iterator(); + for (TableDesc t : tables) { + String columnName = cit.next(); + ExprNodeDesc partKeyExpr = pit.next(); + SourceInfo si = new SourceInfo(t, partKeyExpr, columnName, jobConf); + if (!sourceInfoMap.containsKey(id)) { + sourceInfoMap.put(id, new ArrayList<SourceInfo>()); + } + sourceInfoMap.get(id).add(si); + + // We could have multiple sources restrict the same column, need to take + // the union of the values in that case. + if (columnMap.containsKey(columnName)) { + si.values = columnMap.get(columnName).values; + } + columnMap.put(columnName, si); + } + } + } + + private void processFiles(MapWork work, JobConf jobConf) throws HiveException { + ObjectInputStream in = null; + try { + Path baseDir = work.getTmpPathForPartitionPruning(); + FileSystem fs = FileSystem.get(baseDir.toUri(), jobConf); + + // Find the SourceInfo to put values in. + for (String name : sourceInfoMap.keySet()) { + Path sourceDir = new Path(baseDir, name); + for (FileStatus fstatus : fs.listStatus(sourceDir)) { + LOG.info("Start processing pruning file: " + fstatus.getPath()); + in = new ObjectInputStream(fs.open(fstatus.getPath())); + String columnName = in.readUTF(); + SourceInfo info = null; + + for (SourceInfo si : sourceInfoMap.get(name)) { + if (columnName.equals(si.columnName)) { + info = si; + break; + } + } + + Preconditions.checkArgument(info != null, + "AssertionError: no source info for the column: " + columnName); + + // Read fields + while (in.available() > 0) { + writable.readFields(in); + + Object row = info.deserializer.deserialize(writable); + Object value = info.soi.getStructFieldData(row, info.field); + value = ObjectInspectorUtils.copyToStandardObject(value, info.fieldInspector); + info.values.add(value); + } + } + } + } catch (Exception e) { + throw new HiveException(e); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + throw new HiveException("error while trying to close input stream", e); + } + } + } + + private void prunePartitions(MapWork work) throws HiveException { + for (String source : sourceInfoMap.keySet()) { + for (SourceInfo info : sourceInfoMap.get(source)) { + prunePartitionSingleSource(info, work); + } + } + } + + private void prunePartitionSingleSource(SourceInfo info, MapWork work) + throws HiveException { + Set<Object> values = info.values; + String columnName = info.columnName; + + ObjectInspector oi = + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory + .getPrimitiveTypeInfo(info.fieldInspector.getTypeName())); + + ObjectInspectorConverters.Converter converter = + ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi); + + StructObjectInspector soi = + ObjectInspectorFactory.getStandardStructObjectInspector( + Collections.singletonList(columnName), Collections.singletonList(oi)); + + @SuppressWarnings("rawtypes") + ExprNodeEvaluator eval = ExprNodeEvaluatorFactory.get(info.partKey); + eval.initialize(soi); + + applyFilterToPartitions(work, converter, eval, columnName, values); + } + + private void applyFilterToPartitions( + MapWork work, + ObjectInspectorConverters.Converter converter, + ExprNodeEvaluator eval, + String columnName, + Set<Object> values) throws HiveException { + + Object[] row = new Object[1]; + + Iterator<String> it = work.getPathToPartitionInfo().keySet().iterator(); + while (it.hasNext()) { + String p = it.next(); + PartitionDesc desc = work.getPathToPartitionInfo().get(p); + Map<String, String> spec = desc.getPartSpec(); + if (spec == null) { + throw new AssertionException("No partition spec found in dynamic pruning"); + } + + String partValueString = spec.get(columnName); + if (partValueString == null) { + throw new AssertionException("Could not find partition value for column: " + columnName); + } + + Object partValue = converter.convert(partValueString); + if (LOG.isDebugEnabled()) { + LOG.debug("Converted partition value: " + partValue + " original (" + partValueString + ")"); + } + + row[0] = partValue; + partValue = eval.evaluate(row); + if (LOG.isDebugEnabled()) { + LOG.debug("part key expr applied: " + partValue); + } + + if (!values.contains(partValue)) { + LOG.info("Pruning path: " + p); + it.remove(); + work.getPathToAliases().remove(p); + work.getPaths().remove(p); + work.getPartitionDescs().remove(desc); + } + } + } + + @SuppressWarnings("deprecation") + private static class SourceInfo { + final ExprNodeDesc partKey; + final Deserializer deserializer; + final StructObjectInspector soi; + final StructField field; + final ObjectInspector fieldInspector; + Set<Object> values = new HashSet<Object>(); + final String columnName; + + SourceInfo(TableDesc table, ExprNodeDesc partKey, String columnName, JobConf jobConf) + throws SerDeException { + this.partKey = partKey; + this.columnName = columnName; + + deserializer = ReflectionUtils.newInstance(table.getDeserializerClass(), null); + deserializer.initialize(jobConf, table.getProperties()); + + ObjectInspector inspector = deserializer.getObjectInspector(); + if (LOG.isDebugEnabled()) { + LOG.debug("Type of obj insp: " + inspector.getTypeName()); + } + + soi = (StructObjectInspector) inspector; + List<? extends StructField> fields = soi.getAllStructFieldRefs(); + assert(fields.size() > 1) : "expecting single field in input"; + + field = fields.get(0); + fieldInspector = + ObjectInspectorUtils.getStandardObjectInspector(field.getFieldObjectInspector()); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ca0ffb6..cf2c3bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -23,16 +23,22 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.UUID; +import java.util.Collection; +import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.Dependency; @@ -158,4 +164,54 @@ public class SparkUtilities { } } } + + /** + * Generate a temporary path for dynamic partition pruning in Spark branch + * TODO: no longer need this if we use accumulator! + * @param basePath + * @param id + * @return + */ + public static Path generateTmpPathForPartitionPruning(Path basePath, String id) { + return new Path(basePath, id); + } + + /** + * Return the ID for this BaseWork, in String form. + * @param work the input BaseWork + * @return the unique ID for this BaseWork + */ + public static String getWorkId(BaseWork work) { + String workName = work.getName(); + return workName.substring(workName.indexOf(" ") + 1); + } + + public static SparkTask createSparkTask(HiveConf conf) { + return (SparkTask) TaskFactory.get( + new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + } + + public static SparkTask createSparkTask(SparkWork work, HiveConf conf) { + return (SparkTask) TaskFactory.get(work, conf); + } + + /** + * Recursively find all operators under root, that are of class clazz, and + * put them in result. + * @param result all operators under root that are of class clazz + * @param root the root operator under which all operators will be examined + * @param clazz clas to collect. Must NOT be null. + */ + public static void collectOp(Collection<Operator<?>> result, Operator<?> root, Class<?> clazz) { + Preconditions.checkArgument(clazz != null, "AssertionError: clazz should not be null"); + if (root == null) { + return; + } + if (clazz.equals(root.getClass())) { + result.add(root); + } + for (Operator<?> child : root.getChildOperators()) { + collectOp(result, child, clazz); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java new file mode 100644 index 0000000..3bce49d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.Collection; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; + +/** + * Vectorized version for SparkPartitionPruningSinkOperator. + * Forked from VectorAppMasterEventOperator. + **/ +public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruningSinkOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + + protected transient boolean firstBatch; + + protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + + protected transient Object[] singleRow; + + public VectorSparkPartitionPruningSinkOperator(VectorizationContext context, + OperatorDesc conf) { + super(); + this.conf = (SparkPartitionPruningSinkDesc) conf; + this.vContext = context; + } + + public VectorSparkPartitionPruningSinkOperator() { + } + + @Override + public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + inputObjInspectors[0] = + VectorizedBatchUtil.convertToStandardStructObjectInspector( + (StructObjectInspector) inputObjInspectors[0]); + Collection<Future<?>> result = super.initializeOp(hconf); + assert result.isEmpty(); + + firstBatch = true; + + return result; + } + + @Override + public void process(Object data, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) data; + if (firstBatch) { + vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); + vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], + vContext.getProjectedColumns()); + singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + firstBatch = false; + } + + vectorExtractRowDynBatch.setBatchOnEntry(batch); + ObjectInspector rowInspector = inputObjInspectors[0]; + try { + Writable writableRow; + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical; + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + writableRow = serializer.serialize(singleRow, rowInspector); + writableRow.write(buffer); + } + } catch (Exception e) { + throw new HiveException(e); + } + + vectorExtractRowDynBatch.forgetBatchOnExit(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 1de7e40..e13c4dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -514,34 +515,40 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); - LOG.info("Total number of paths: " + paths.length + - ", launching " + numThreads + " threads to check non-combinable ones."); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads); - try { - for (int i = 0; i < numThreads; i++) { - int start = i * numPathPerThread; - int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit( - new CheckNonCombinablePathCallable(paths, start, length, job))); - } - Set<Integer> nonCombinablePathIndices = new HashSet<Integer>(); - for (Future<Set<Integer>> future : futureList) { - nonCombinablePathIndices.addAll(future.get()); - } - for (int i = 0; i < paths.length; i++) { - if (nonCombinablePathIndices.contains(i)) { - nonCombinablePaths.add(paths[i]); - } else { - combinablePaths.add(paths[i]); + + // This check is necessary because for Spark branch, the result array from + // getInputPaths() above could be empty, and therefore numThreads could be 0. + // In that case, Executors.newFixedThreadPool will fail. + if (numThreads > 0) { + LOG.info("Total number of paths: " + paths.length + + ", launching " + numThreads + " threads to check non-combinable ones."); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads); + try { + for (int i = 0; i < numThreads; i++) { + int start = i * numPathPerThread; + int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; + futureList.add(executor.submit( + new CheckNonCombinablePathCallable(paths, start, length, job))); + } + Set<Integer> nonCombinablePathIndices = new HashSet<Integer>(); + for (Future<Set<Integer>> future : futureList) { + nonCombinablePathIndices.addAll(future.get()); + } + for (int i = 0; i < paths.length; i++) { + if (nonCombinablePathIndices.contains(i)) { + nonCombinablePaths.add(paths[i]); + } else { + combinablePaths.add(paths[i]); + } } + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException(e); + } finally { + executor.shutdownNow(); } - } catch (Exception e) { - LOG.error("Error checking non-combinable path", e); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - throw new IOException(e); - } finally { - executor.shutdownNow(); } // Store the previous value for the path specification http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 2ff3951..fd16b35 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -266,6 +268,18 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } else { mrwork = Utilities.getMapWork(job); } + + // Prune partitions + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark") + && HiveConf.getBoolVar(job, HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { + SparkDynamicPartitionPruner pruner = new SparkDynamicPartitionPruner(); + try { + pruner.prune(mrwork, job); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } } @@ -309,18 +323,28 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } Path[] getInputPaths(JobConf job) throws IOException { - Path[] dirs = FileInputFormat.getInputPaths(job); - if (dirs.length == 0) { - // on tez we're avoiding to duplicate the file info in FileInputFormat. - if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - try { - List<Path> paths = Utilities.getInputPathsTez(job, mrwork); - dirs = paths.toArray(new Path[paths.size()]); - } catch (Exception e) { - throw new IOException("Could not create input files", e); + Path[] dirs; + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + Set<String> pathStrings = mrwork.getPathToPartitionInfo().keySet(); + dirs = new Path[pathStrings.size()]; + Iterator<String> it = pathStrings.iterator(); + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new Path(it.next()); + } + } else { + dirs = FileInputFormat.getInputPaths(job); + if (dirs.length == 0) { + // on tez we're avoiding to duplicate the file info in FileInputFormat. + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + try { + List<Path> paths = Utilities.getInputPathsTez(job, mrwork); + dirs = paths.toArray(new Path[paths.size()]); + } catch (Exception e) { + throw new IOException("Could not create input files", e); + } + } else { + throw new IOException("No input paths specified in job"); } - } else { - throw new IOException("No input paths specified in job"); } } return dirs; http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 8546d21..f475926 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -48,10 +48,12 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -152,15 +154,24 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; - ParseContext parseContext = context.parseContext; + ParseContext parseContext; + if (procCtx instanceof OptimizeTezProcContext) { + parseContext = ((OptimizeTezProcContext) procCtx).parseContext; + } else if (procCtx instanceof OptimizeSparkProcContext) { + parseContext = ((OptimizeSparkProcContext) procCtx).getParseContext(); + } else { + throw new IllegalArgumentException("expected parseContext to be either " + + "OptimizeTezProcContext or OptimizeSparkProcContext, but found " + + procCtx.getClass().getName()); + } FilterOperator filter = (FilterOperator) nd; FilterDesc desc = filter.getConf(); TableScanOperator ts = null; - if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + if (!parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING) && + !parseContext.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { // nothing to do when the optimization is off return null; } @@ -311,14 +322,25 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { groupByOp.setColumnExprMap(colMap); // finally add the event broadcast operator - DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc(); - eventDesc.setTableScan(ts); - eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils - .getFieldSchemasFromColumnList(keyExprs, "key"))); - eventDesc.setTargetColumnName(column); - eventDesc.setPartKey(partKey); - - OperatorFactory.getAndMakeChild(eventDesc, groupByOp); + if (HiveConf.getVar(parseContext.getConf(), + ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + DynamicPruningEventDesc eventDesc = new DynamicPruningEventDesc(); + eventDesc.setTableScan(ts); + eventDesc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils + .getFieldSchemasFromColumnList(keyExprs, "key"))); + eventDesc.setTargetColumnName(column); + eventDesc.setPartKey(partKey); + OperatorFactory.getAndMakeChild(eventDesc, groupByOp); + } else { + // Must be spark branch + SparkPartitionPruningSinkDesc desc = new SparkPartitionPruningSinkDesc(); + desc.setTableScan(ts); + desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils + .getFieldSchemasFromColumnList(keyExprs, "key"))); + desc.setTargetColumnName(column); + desc.setPartKey(partKey); + OperatorFactory.getAndMakeChild(desc, groupByOp); + } } private Map<Node, Object> walkExprTree(ExprNodeDesc pred, NodeProcessorCtx ctx) http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index a7cf8b7..e6db133 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -167,7 +167,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - if (isSparkExecEngine || (pctx.getContext().getExplain() && !isTezExecEngine)) { + if (pctx.getContext().getExplain() && !isTezExecEngine && !isSparkExecEngine) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java new file mode 100644 index 0000000..3742857 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; + +/** + * If we expect the number of keys for dynamic pruning to be too large we + * disable it. + * + * Cloned from RemoveDynamicPruningBySize + */ +public class SparkRemoveDynamicPruningBySize implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(SparkRemoveDynamicPruningBySize.class.getName()); + + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext, + Object... nodeOutputs) + throws SemanticException { + + OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext; + + SparkPartitionPruningSinkOperator op = (SparkPartitionPruningSinkOperator) nd; + SparkPartitionPruningSinkDesc desc = op.getConf(); + + if (desc.getStatistics().getDataSize() > context.getConf() + .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { + Operator<?> child = op; + Operator<?> curr = op; + + while (curr.getChildOperators().size() <= 1) { + child = curr; + curr = curr.getParentOperators().get(0); + } + + curr.removeChild(child); + // at this point we've found the fork in the op pipeline that has the pruning as a child plan. + LOG.info("Disabling dynamic pruning for: " + + desc.getTableScan().getName() + + ". Expected data size is too big: " + desc.getStatistics().getDataSize()); + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index ad47547..180513e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -919,6 +919,7 @@ public class Vectorizer implements PhysicalPlanResolver { case FILESINK: case LIMIT: case EVENT: + case SPARKPRUNINGSINK: ret = true; break; case HASHTABLESINK: @@ -965,6 +966,7 @@ public class Vectorizer implements PhysicalPlanResolver { break; case LIMIT: case EVENT: + case SPARKPRUNINGSINK: ret = true; break; case HASHTABLESINK: http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java new file mode 100644 index 0000000..cc78227 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.spark; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; + +@Explain(displayName = "Spark Partition Pruning Sink Operator") +public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc { + + // column in the target table that will be pruned against + private String targetColumnName; + + private TableDesc table; + + private transient TableScanOperator tableScan; + + // the partition column we're interested in + private ExprNodeDesc partKey; + + private Path path; + + private String targetWork; + + @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED }) + public Path getPath() { + return path; + } + + public void setPath(Path path) { + this.path = path; + } + + @Explain(displayName = "target work") + public String getTargetWork() { + return this.targetWork; + } + + public void setTargetWork(String targetWork) { + this.targetWork = targetWork; + } + + public TableScanOperator getTableScan() { + return tableScan; + } + + public void setTableScan(TableScanOperator tableScan) { + this.tableScan = tableScan; + } + + @Explain(displayName = "target column name") + public String getTargetColumnName() { + return targetColumnName; + } + + public void setTargetColumnName(String targetColumnName) { + this.targetColumnName = targetColumnName; + } + + public ExprNodeDesc getPartKey() { + return partKey; + } + + public void setPartKey(ExprNodeDesc partKey) { + this.partKey = partKey; + } + + public TableDesc getTable() { + return table; + } + + public void setTable(TableDesc table) { + this.table = table; + } + + @Explain(displayName = "partition key expr") + public String getPartKeyString() { + return partKey.getExprString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 447f104..0a0c791 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; -import org.apache.hadoop.hive.ql.plan.SparkWork; import java.io.Serializable; import java.util.HashMap; @@ -138,6 +138,13 @@ public class GenSparkProcContext implements NodeProcessorCtx { // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias. public final Map<String, Operator<? extends OperatorDesc>> topOps; + // The set of pruning sinks + public final Set<Operator<?>> pruningSinkSet; + + // The set of TableScanOperators for pruning OP trees + public final Set<Operator<?>> clonedPruningTableScanSet; + + @SuppressWarnings("unchecked") public GenSparkProcContext(HiveConf conf, ParseContext parseContext, @@ -153,8 +160,7 @@ public class GenSparkProcContext implements NodeProcessorCtx { this.inputs = inputs; this.outputs = outputs; this.topOps = topOps; - this.currentTask = (SparkTask) TaskFactory.get( - new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + this.currentTask = SparkUtilities.createSparkTask(conf); this.rootTasks.add(currentTask); this.leafOpToFollowingWorkInfo = new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>(); @@ -177,5 +183,7 @@ public class GenSparkProcContext implements NodeProcessorCtx { this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>(); this.fileSinkSet = new LinkedHashSet<FileSinkOperator>(); this.fileSinkMap = new LinkedHashMap<FileSinkOperator, List<FileSinkOperator>>(); + this.pruningSinkSet = new LinkedHashSet<Operator<?>>(); + this.clonedPruningTableScanSet = new LinkedHashSet<Operator<?>>(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 7992c88..1c0b79d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -46,7 +46,9 @@ import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.SparkWork; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.hadoop.hive.ql.plan.TableDesc; /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork @@ -132,7 +135,7 @@ public class GenSparkUtils { // remember which parent belongs to which tag reduceWork.getTagToInput().put(reduceSink.getConf().getTag(), - context.preceedingWork.getName()); + context.preceedingWork.getName()); // remember the output name of the reduce sink reduceSink.getConf().setOutputName(reduceWork.getName()); @@ -218,6 +221,16 @@ public class GenSparkUtils { Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator(); for (Operator<?> op : opQueue) { Operator<?> newOp = newOpQueueIt.next(); + + // We need to update rootToWorkMap in case the op is a key, since even + // though we clone the op tree, we're still using the same MapWork/ReduceWork. + if (context.rootToWorkMap.containsKey(op)) { + context.rootToWorkMap.put(newOp, context.rootToWorkMap.get(op)); + } + // Don't remove the old entry - in SparkPartitionPruningSink it still + // refers to the old TS, and we need to lookup it later in + // processPartitionPruningSink. + if (op instanceof FileSinkOperator) { List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op); if (fileSinkList == null) { @@ -225,6 +238,12 @@ public class GenSparkUtils { } fileSinkList.add((FileSinkOperator) newOp); context.fileSinkMap.put((FileSinkOperator) op, fileSinkList); + } else if (op instanceof SparkPartitionPruningSinkOperator) { + SparkPartitionPruningSinkOperator oldPruningSink = (SparkPartitionPruningSinkOperator) op; + SparkPartitionPruningSinkOperator newPruningSink = (SparkPartitionPruningSinkOperator) newOp; + newPruningSink.getConf().setTableScan(oldPruningSink.getConf().getTableScan()); + context.pruningSinkSet.add(newPruningSink); + context.pruningSinkSet.remove(oldPruningSink); } } } @@ -337,6 +356,67 @@ public class GenSparkUtils { } } + /** + * Populate partition pruning information from the pruning sink operator to the + * target MapWork (the MapWork for the big table side). The information include the source table + * name, column name, and partition key expression. It also set up the temporary path used to + * communicate between the target MapWork and source BaseWork. + * + * Here "source" refers to the small table side, while "target" refers to the big + * table side. + * + * @param context the spark context. + * @param pruningSink the pruner sink operator being processed. + */ + public void processPartitionPruningSink(GenSparkProcContext context, + SparkPartitionPruningSinkOperator pruningSink) { + SparkPartitionPruningSinkDesc desc = pruningSink.getConf(); + TableScanOperator ts = desc.getTableScan(); + MapWork targetWork = (MapWork) context.rootToWorkMap.get(ts); + + Preconditions.checkArgument( + targetWork != null, + "No targetWork found for tablescan " + ts); + + String targetId = SparkUtilities.getWorkId(targetWork); + + BaseWork sourceWork = getEnclosingWork(pruningSink, context); + String sourceId = SparkUtilities.getWorkId(sourceWork); + + // set up temporary path to communicate between the small/big table + Path tmpPath = targetWork.getTmpPathForPartitionPruning(); + if (tmpPath == null) { + Path baseTmpPath = context.parseContext.getContext().getMRTmpPath(); + tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId); + targetWork.setTmpPathForPartitionPruning(tmpPath); + LOG.info("Setting tmp path between source work and target work:\n" + tmpPath); + } + + desc.setPath(new Path(tmpPath, sourceId)); + desc.setTargetWork(targetWork.getName()); + + // store table descriptor in map-targetWork + if (!targetWork.getEventSourceTableDescMap().containsKey(sourceId)) { + targetWork.getEventSourceTableDescMap().put(sourceId, new LinkedList<TableDesc>()); + } + List<TableDesc> tables = targetWork.getEventSourceTableDescMap().get(sourceId); + tables.add(pruningSink.getConf().getTable()); + + // store column name in map-targetWork + if (!targetWork.getEventSourceColumnNameMap().containsKey(sourceId)) { + targetWork.getEventSourceColumnNameMap().put(sourceId, new LinkedList<String>()); + } + List<String> columns = targetWork.getEventSourceColumnNameMap().get(sourceId); + columns.add(desc.getTargetColumnName()); + + // store partition key expr in map-targetWork + if (!targetWork.getEventSourcePartKeyExprMap().containsKey(sourceId)) { + targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList<ExprNodeDesc>()); + } + List<ExprNodeDesc> keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId); + keys.add(desc.getPartKey()); + } + public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, ReduceWork reduceWork) throws SemanticException { SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); @@ -490,4 +570,33 @@ public class GenSparkUtils { } return false; } + + /** + * getEncosingWork finds the BaseWork any given operator belongs to. + */ + public BaseWork getEnclosingWork(Operator<?> op, GenSparkProcContext procCtx) { + List<Operator<?>> ops = new ArrayList<Operator<?>>(); + findRoots(op, ops); + for (Operator<?> r : ops) { + BaseWork work = procCtx.rootToWorkMap.get(r); + if (work != null) { + return work; + } + } + return null; + } + + /* + * findRoots returns all root operators (in ops) that result in operator op + */ + private void findRoots(Operator<?> op, List<Operator<?>> ops) { + List<Operator<?>> parents = op.getParentOperators(); + if (parents == null || parents.isEmpty()) { + ops.add(op); + return; + } + for (Operator<?> p : parents) { + findRoots(p, ops); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java index f7586a4..3b71af1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java @@ -20,15 +20,12 @@ package org.apache.hadoop.hive.ql.parse.spark; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -49,19 +46,12 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx { private final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>(); private final Map<MapJoinOperator, Long> mjOpSizes = new HashMap<MapJoinOperator, Long>(); - // rootOperators are all the table scan operators in sequence - // of traversal - private final Deque<Operator<? extends OperatorDesc>> rootOperators; - public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext, - Set<ReadEntity> inputs, Set<WriteEntity> outputs, - Deque<Operator<? extends OperatorDesc>> rootOperators) { - + Set<ReadEntity> inputs, Set<WriteEntity> outputs) { this.conf = conf; this.parseContext = parseContext; this.inputs = inputs; this.outputs = outputs; - this.rootOperators = rootOperators; } public ParseContext getParseContext() { @@ -84,10 +74,6 @@ public class OptimizeSparkProcContext implements NodeProcessorCtx { return visitedReduceSinks; } - public Deque<Operator<? extends OperatorDesc>> getRootOperators() { - return rootOperators; - } - public Map<MapJoinOperator, Long> getMjOpSizes() { return mjOpSizes; } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 7f2c079..27a1d99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hive.ql.parse.spark; import java.io.Serializable; import java.util.ArrayList; -import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +33,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -44,12 +43,14 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.ForwardWalker; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -58,7 +59,10 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate; +import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization; +import org.apache.hadoop.hive.ql.optimizer.SparkRemoveDynamicPruningBySize; +import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -72,8 +76,8 @@ import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer; import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver; -import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver; +import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -83,7 +87,6 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** * SparkCompiler translates the operator plan into SparkTasks. @@ -102,22 +105,70 @@ public class SparkCompiler extends TaskCompiler { protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); - // Sequence of TableScan operators to be walked - Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>(); - deque.addAll(pCtx.getTopOps().values()); - OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque); - // create a walker which walks the tree in a DFS manner while maintaining - // the operator stack. + OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs); + + // Run Spark Dynamic Partition Pruning + runDynamicPartitionPruning(procCtx); + + // Annotation OP tree with statistics + runStatsAnnotation(procCtx); + + // Run Join releated optimizations + runJoinOptimizations(procCtx); + + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); + } + + private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws SemanticException { + new AnnotateWithStatistics().transform(procCtx.getParseContext()); + new AnnotateWithOpTraits().transform(procCtx.getParseContext()); + } + + private void runDynamicPartitionPruning(OptimizeSparkProcContext procCtx) + throws SemanticException { + if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) { + return; + } + + ParseContext parseContext = procCtx.getParseContext(); + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put( + new RuleRegExp(new String("Dynamic Partition Pruning"), + FilterOperator.getOperatorName() + "%"), + new DynamicPartitionPruningOptimization()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + GraphWalker ogw = new ForwardWalker(disp); + + List<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(parseContext.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // need a new run of the constant folding because we might have created lots + // of "and true and true" conditions. + if(procCtx.getConf().getBoolVar(HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { + new ConstantPropagate().transform(parseContext); + } + } + + private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws SemanticException { + ParseContext pCtx = procCtx.getParseContext(); Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); opRules.put(new RuleRegExp("Set parallelism - ReduceSink", - ReduceSinkOperator.getOperatorName() + "%"), - new SetSparkReducerParallelism()); + ReduceSinkOperator.getOperatorName() + "%"), + new SetSparkReducerParallelism()); opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx)); + opRules.put(new RuleRegExp("Disabling Dynamic Partition Pruning By Size", + SparkPartitionPruningSinkOperator.getOperatorName() + "%"), + new SparkRemoveDynamicPruningBySize()); + // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); @@ -127,7 +178,6 @@ public class SparkCompiler extends TaskCompiler { ArrayList<Node> topNodes = new ArrayList<Node>(); topNodes.addAll(pCtx.getTopOps().values()); ogw.startWalking(topNodes, null); - PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } /** @@ -138,20 +188,90 @@ public class SparkCompiler extends TaskCompiler { List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); - GenSparkUtils.getUtils().resetSequenceNumber(); - ParseContext tempParseContext = getParseContext(pCtx, rootTasks); - GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + GenSparkUtils utils = GenSparkUtils.getUtils(); + utils.resetSequenceNumber(); + ParseContext tempParseContext = getParseContext(pCtx, rootTasks); GenSparkProcContext procCtx = new GenSparkProcContext( conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps()); + // -------------------------------- First Pass ---------------------------------- // + // Identify SparkPartitionPruningSinkOperators, and break OP tree if necessary + + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + opRules.put(new RuleRegExp("Clone OP tree for PartitionPruningSink", + SparkPartitionPruningSinkOperator.getOperatorName() + "%"), + new SplitOpTreeForDPP()); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); + + List<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(pCtx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // -------------------------------- Second Pass ---------------------------------- // + // Process operator tree in two steps: first we process the extra op trees generated + // in the first pass. Then we process the main op tree, and the result task will depend + // on the task generated in the first pass. + topNodes.clear(); + topNodes.addAll(procCtx.topOps.values()); + generateTaskTreeHelper(procCtx, topNodes); + + // If this set is not empty, it means we need to generate a separate task for collecting + // the partitions used. + if (!procCtx.clonedPruningTableScanSet.isEmpty()) { + SparkTask pruningTask = SparkUtilities.createSparkTask(conf); + SparkTask mainTask = procCtx.currentTask; + pruningTask.addDependentTask(procCtx.currentTask); + procCtx.rootTasks.remove(procCtx.currentTask); + procCtx.rootTasks.add(pruningTask); + procCtx.currentTask = pruningTask; + + topNodes.clear(); + topNodes.addAll(procCtx.clonedPruningTableScanSet); + generateTaskTreeHelper(procCtx, topNodes); + + procCtx.currentTask = mainTask; + } + + // -------------------------------- Post Pass ---------------------------------- // + + // we need to clone some operator plans and remove union operators still + for (BaseWork w : procCtx.workWithUnionOperators) { + GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w); + } + + // we need to fill MapWork with 'local' work and bucket information for SMB Join. + GenSparkUtils.getUtils().annotateMapWork(procCtx); + + // finally make sure the file sink operators are set up right + for (FileSinkOperator fileSink : procCtx.fileSinkSet) { + GenSparkUtils.getUtils().processFileSink(procCtx, fileSink); + } + + // Process partition pruning sinks + for (Operator<?> prunerSink : procCtx.pruningSinkSet) { + utils.processPartitionPruningSink(procCtx, (SparkPartitionPruningSinkOperator) prunerSink); + } + + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); + } + + private void generateTaskTreeHelper(GenSparkProcContext procCtx, List<Node> topNodes) + throws SemanticException { // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); + opRules.put(new RuleRegExp("Split Work - SparkPartitionPruningSink", + SparkPartitionPruningSinkOperator.getOperatorName() + "%"), genSparkWork); + opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc()); opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", @@ -186,8 +306,10 @@ public class SparkCompiler extends TaskCompiler { * SMBJoinOP * * Some of the other processors are expecting only one traversal beyond SMBJoinOp. - * We need to traverse from the big-table path only, and stop traversing on the small-table path once we reach SMBJoinOp. - * Also add some SMB join information to the context, so we can properly annotate the MapWork later on. + * We need to traverse from the big-table path only, and stop traversing on the + * small-table path once we reach SMBJoinOp. + * Also add some SMB join information to the context, so we can properly annotate + * the MapWork later on. */ opRules.put(new TypeRule(SMBMapJoinOperator.class), new NodeProcessor() { @@ -219,25 +341,8 @@ public class SparkCompiler extends TaskCompiler { // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); - List<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); - - // we need to clone some operator plans and remove union operators still - for (BaseWork w: procCtx.workWithUnionOperators) { - GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w); - } - - // we need to fill MapWork with 'local' work and bucket information for SMB Join. - GenSparkUtils.getUtils().annotateMapWork(procCtx); - - // finally make sure the file sink operators are set up right - for (FileSinkOperator fileSink: procCtx.fileSinkSet) { - GenSparkUtils.getUtils().processFileSink(procCtx, fileSink); - } - - PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java new file mode 100644 index 0000000..20432c7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Collection; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hive.serde2.Serializer; + +/** + * This operator gets partition info from the upstream operators, and write them + * to HDFS. This will later be read at the driver, and used for pruning the partitions + * for the big table side. + */ +public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPruningSinkDesc> { + + @SuppressWarnings("deprecation") + protected transient Serializer serializer; + protected transient DataOutputBuffer buffer; + protected static final Log LOG = LogFactory.getLog(SparkPartitionPruningSinkOperator.class); + + @SuppressWarnings("deprecation") + public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + Collection<Future<?>> result = super.initializeOp(hconf); + serializer = (Serializer) ReflectionUtils.newInstance( + conf.getTable().getDeserializerClass(), null); + buffer = new DataOutputBuffer(); + return result; + } + + @Override + public void process(Object row, int tag) throws HiveException { + ObjectInspector rowInspector = inputObjInspectors[0]; + try { + Writable writableRow = serializer.serialize(row, rowInspector); + writableRow.write(buffer); + } catch (Exception e) { + throw new HiveException(e); + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort) { + try { + flushToFile(); + } catch (Exception e) { + throw new HiveException(e); + } + } + } + + private void flushToFile() throws IOException { + // write an intermediate file to the specified path + // the format of the path is: tmpPath/targetWorkId/sourceWorkId/randInt + Path path = conf.getPath(); + FileSystem fs = path.getFileSystem(this.getConfiguration()); + fs.mkdirs(path); + + while (true) { + path = new Path(path, String.valueOf(Utilities.randGen.nextInt())); + if (!fs.exists(path)) { + break; + } + } + + short numOfRepl = fs.getDefaultReplication(path); + + ObjectOutputStream out = null; + FSDataOutputStream fsout = null; + + try { + fsout = fs.create(path, numOfRepl); + out = new ObjectOutputStream(new BufferedOutputStream(fsout, 4096)); + out.writeUTF(conf.getTargetColumnName()); + buffer.writeTo(out); + } catch (Exception e) { + try { + fs.delete(path, false); + } catch (Exception ex) { + LOG.warn("Exception happened while trying to clean partial file."); + } + throw e; + } finally { + if (out != null) { + LOG.info("Flushed to file: " + path); + out.close(); + } else if (fsout != null) { + fsout.close(); + } + } + } + + @Override + public OperatorType getType() { + return OperatorType.SPARKPRUNINGSINK; + } + + @Override + public String getName() { + return getOperatorName(); + } + + public static String getOperatorName() { + return "SPARKPRUNINGSINK"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java new file mode 100644 index 0000000..c140f67 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.spark; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like + * this: + * + * Original Tree: + * TS TS + * | | + * FIL FIL + * | | \ + * RS RS SEL + * \ / | + * JOIN GBY + * | + * SPARKPRUNINGSINK + * + * It removes the branch containing SPARKPRUNINGSINK from the original operator tree, and splits it into + * two separate trees: + * + * Tree #1: Tree #2: + * TS TS TS + * | | | + * FIL FIL FIL + * | | | + * RS RS SEL + * \ / | + * JOIN GBY + * | + * SPARKPRUNINGSINK + * + * For MapJoinOperator, this optimizer will not do anything - it should be executed within + * the same SparkTask. + */ +public class SplitOpTreeForDPP implements NodeProcessor { + @Override + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + SparkPartitionPruningSinkOperator pruningSinkOp = (SparkPartitionPruningSinkOperator) nd; + GenSparkProcContext context = (GenSparkProcContext) procCtx; + + // Locate the op where the branch starts + // This is guaranteed to succeed since the branch always follow the pattern + // as shown in the first picture above. + Operator<?> filterOp = pruningSinkOp; + Operator<?> selOp = null; + while (filterOp != null) { + if (filterOp.getNumChild() > 1) { + break; + } else { + selOp = filterOp; + filterOp = filterOp.getParentOperators().get(0); + } + } + + // Check if this is a MapJoin. If so, do not split. + for (Operator<?> childOp : filterOp.getChildOperators()) { + if (childOp instanceof ReduceSinkOperator && + childOp.getChildOperators().get(0) instanceof MapJoinOperator) { + context.pruningSinkSet.add(pruningSinkOp); + return null; + } + } + + List<Operator<?>> roots = new LinkedList<Operator<?>>(); + collectRoots(roots, pruningSinkOp); + + List<Operator<?>> savedChildOps = filterOp.getChildOperators(); + filterOp.setChildOperators(Utilities.makeList(selOp)); + + // Now clone the tree above selOp + List<Operator<?>> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots); + for (int i = 0; i < roots.size(); i++) { + TableScanOperator newTs = (TableScanOperator) newRoots.get(i); + TableScanOperator oldTs = (TableScanOperator) roots.get(i); + newTs.getConf().setTableMetadata(oldTs.getConf().getTableMetadata()); + } + context.clonedPruningTableScanSet.addAll(newRoots); + + // Restore broken links between operators, and remove the branch from the original tree + filterOp.setChildOperators(savedChildOps); + filterOp.removeChild(selOp); + + // Find the cloned PruningSink and add it to pruningSinkSet + Set<Operator<?>> sinkSet = new HashSet<Operator<?>>(); + for (Operator<?> root : newRoots) { + SparkUtilities.collectOp(sinkSet, root, SparkPartitionPruningSinkOperator.class); + } + Preconditions.checkArgument(sinkSet.size() == 1, + "AssertionError: expected to only contain one SparkPartitionPruningSinkOperator," + + " but found " + sinkSet.size()); + SparkPartitionPruningSinkOperator clonedPruningSinkOp = + (SparkPartitionPruningSinkOperator) sinkSet.iterator().next(); + clonedPruningSinkOp.getConf().setTableScan(pruningSinkOp.getConf().getTableScan()); + context.pruningSinkSet.add(clonedPruningSinkOp); + + return null; + } + + /** + * Recursively collect all roots (e.g., table scans) that can be reached via this op. + * @param result contains all roots can be reached via op + * @param op the op to examine. + */ + private void collectRoots(List<Operator<?>> result, Operator<?> op) { + if (op.getNumParent() == 0) { + result.add(op); + } else { + for (Operator<?> parentOp : op.getParentOperators()) { + collectRoots(result, parentOp); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 3217df2..2cb9257 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -87,6 +87,8 @@ public class MapWork extends BaseWork { private Path tmpHDFSPath; + private Path tmpPathForPartitionPruning; + private String inputformat; private String indexIntermediateFile; @@ -455,6 +457,14 @@ public class MapWork extends BaseWork { this.tmpHDFSPath = tmpHDFSPath; } + public Path getTmpPathForPartitionPruning() { + return this.tmpPathForPartitionPruning; + } + + public void setTmpPathForPartitionPruning(Path tmpPathForPartitionPruning) { + this.tmpPathForPartitionPruning = tmpPathForPartitionPruning; + } + public void mergingInto(MapWork mapWork) { // currently, this is sole field affecting mergee task mapWork.useBucketizedHiveInputFormat |= useBucketizedHiveInputFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 9e9a2a2..d27f3ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -103,7 +103,7 @@ public class TableScanDesc extends AbstractOperatorDesc { private transient TableSample tableSample; - private transient final Table tableMetadata; + private transient Table tableMetadata; public TableScanDesc() { this(null, null); @@ -284,6 +284,10 @@ public class TableScanDesc extends AbstractOperatorDesc { return tableMetadata; } + public void setTableMetadata(Table tableMetadata) { + this.tableMetadata = tableMetadata; + } + public TableSample getTableSample() { return tableSample; } http://git-wip-us.apache.org/repos/asf/hive/blob/42216997/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index 363e49e..32af813 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -67,8 +67,18 @@ public class SyntheticJoinPredicate implements Transform { @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") - || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + boolean enabled = false; + String queryEngine = pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE); + + if (queryEngine.equals("tez") + && pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) { + enabled = true; + } else if ((queryEngine.equals("spark") + && pctx.getConf().getBoolVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING))) { + enabled = true; + } + + if (!enabled) { return pctx; }
