Repository: hive Updated Branches: refs/heads/master 8a6d8186c -> 41fbe7bb7
HIVE-13873: Support column pruning for struct fields in select statement(Ferdinand Xu via Sun Chao) Closes #105 from winningsix/HIVE-13873. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/41fbe7bb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/41fbe7bb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/41fbe7bb Branch: refs/heads/master Commit: 41fbe7bb7d4ad1eb0510a08df22db59e7a81c245 Parents: 8a6d818 Author: Ferdinand Xu <[email protected]> Authored: Wed Oct 26 02:02:31 2016 +0800 Committer: Ferdinand Xu <[email protected]> Committed: Wed Oct 26 02:02:37 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/FetchTask.java | 2 +- .../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 4 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 10 +- .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 2 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 9 +- .../hive/ql/io/parquet/ProjectionPusher.java | 6 + .../io/parquet/convert/HiveStructConverter.java | 29 ++- .../parquet/read/DataWritableReadSupport.java | 138 +++++++++++- .../hive/ql/optimizer/ColumnPrunerProcCtx.java | 102 +++++++++ .../ql/optimizer/ColumnPrunerProcFactory.java | 28 +-- .../hadoop/hive/ql/optimizer/FieldNode.java | 80 +++++++ .../NestedColumnFieldPruningUtils.java | 91 ++++++++ .../hadoop/hive/ql/plan/TableScanDesc.java | 10 +- .../io/parquet/HiveParquetSchemaTestUtils.java | 75 +++++++ .../ql/io/parquet/TestHiveSchemaConverter.java | 49 +---- .../read/TestDataWritableReadSupport.java | 112 ++++++++++ .../ql/optimizer/TestColumnPrunerProcCtx.java | 215 +++++++++++++++++++ .../TestNestedColumnFieldPruningUtils.java | 89 ++++++++ .../hive/serde2/ColumnProjectionUtils.java | 60 +++++- 19 files changed, 1032 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 8c7d99d..e708d58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -76,7 +76,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable { TableScanOperator ts = (TableScanOperator) source; // push down projections ColumnProjectionUtils.appendReadColumns( - job, ts.getNeededColumnIDs(), ts.getNeededColumns()); + job, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(job, ts); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 584eff4..7c1e344 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -21,12 +21,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +205,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp TableScanOperator ts = (TableScanOperator)aliasToWork.get(alias); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 0f02222..68477ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -220,7 +220,7 @@ public class TableScanOperator extends Operator<TableScanDesc> implements ObjectInspectorUtils.partialCopyToStandardObject(rdSize, row, rdSizeColumn, 1, (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable)rdSize.get(0)).get())); + currentStat.addToStat(StatsSetupConst.RAW_DATA_SIZE, (((LongWritable) rdSize.get(0)).get())); } } @@ -297,6 +297,14 @@ public class TableScanOperator extends Operator<TableScanDesc> implements conf.setNeededColumns(columnNames); } + public List<String> getNeededNestedColumnPaths() { + return conf.getNeededNestedColumnPaths(); + } + + public void setNeededNestedColumnPaths(List<String> nestedColumnPaths) { + conf.setNeededNestedColumnPaths(nestedColumnPaths); + } + public List<String> getNeededColumns() { return conf.getNeededColumns(); } http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index c9ff191..591ea97 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -475,7 +475,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); // push down projections ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters HiveInputFormat.pushFilters(jobClone, ts); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c4b9940..69956ec 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,9 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import java.util.Arrays; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -29,11 +26,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -513,7 +508,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } - + protected static PartitionDesc getPartitionDescFromPath( Map<Path, PartitionDesc> pathToPartitionInfo, Path dir) throws IOException { @@ -632,7 +627,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> TableScanOperator ts = (TableScanOperator) op; // push down projections. ColumnProjectionUtils.appendReadColumns( - jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns()); + jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns(), ts.getNeededNestedColumnPaths()); // push down filters pushFilters(jobConf, ts); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index b058500..68407f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -99,6 +99,9 @@ public class ProjectionPusher { boolean allColumnsNeeded = false; boolean noFilters = false; Set<Integer> neededColumnIDs = new HashSet<Integer>(); + // To support nested column pruning, we need to track the path from the top to the nested + // fields + Set<String> neededNestedColumnPaths = new HashSet<String>(); List<ExprNodeGenericFuncDesc> filterExprs = new ArrayList<ExprNodeGenericFuncDesc>(); RowSchema rowSchema = null; @@ -112,6 +115,7 @@ public class ProjectionPusher { allColumnsNeeded = true; } else { neededColumnIDs.addAll(ts.getNeededColumnIDs()); + neededNestedColumnPaths.addAll(ts.getNeededNestedColumnPaths()); } rowSchema = ts.getSchema(); @@ -143,6 +147,8 @@ public class ProjectionPusher { if (!allColumnsNeeded) { if (!neededColumnIDs.isEmpty()) { ColumnProjectionUtils.appendReadColumns(jobConf, new ArrayList<Integer>(neededColumnIDs)); + ColumnProjectionUtils.appendNestedColumnPaths(jobConf, + new ArrayList<String>(neededNestedColumnPaths)); } } else { ColumnProjectionUtils.setReadAllColumns(jobConf); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index a89aa4d..936b371 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -85,7 +85,7 @@ public class HiveStructConverter extends HiveGroupConverter { List<Type> selectedFields = selectedGroupType.getFields(); for (int i = 0; i < selectedFieldCount; i++) { Type subtype = selectedFields.get(i); - if (containingGroupType.getFields().contains(subtype)) { + if (isSubType(containingGroupType, subtype)) { int fieldIndex = containingGroupType.getFieldIndex(subtype.getName()); TypeInfo _hiveTypeInfo = getFieldTypeIgnoreCase(hiveTypeInfo, subtype.getName(), fieldIndex); converters[i] = getFieldConverter(subtype, fieldIndex, _hiveTypeInfo); @@ -96,6 +96,33 @@ public class HiveStructConverter extends HiveGroupConverter { } } + // This method is used to check whether the subType is a sub type of the groupType. + // For nested attribute, we need to check its existence by the root path in a recursive way. + private boolean isSubType( + final GroupType groupType, + final Type subtype) { + if (subtype.isPrimitive() || subtype.isRepetition(Type.Repetition.REPEATED)) { + return groupType.getFields().contains(subtype); + } else { + for (Type g : groupType.getFields()) { + if (!g.isPrimitive() && g.getName().equals(subtype.getName())) { + // check all elements are contained in g + boolean containsAll = false; + for (Type subSubType : subtype.asGroupType().getFields()) { + containsAll = isSubType(g.asGroupType(), subSubType); + if (!containsAll) { + break; + } + } + if (containsAll) { + return containsAll; + } + } + } + return false; + } + } + private TypeInfo getFieldTypeIgnoreCase(TypeInfo hiveTypeInfo, String fieldName, int fieldIndex) { if (hiveTypeInfo == null) { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 3e38cc7..8d8b0c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -14,10 +14,13 @@ package org.apache.hadoop.hive.ql.io.parquet.read; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -25,6 +28,8 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.FieldNode; +import org.apache.hadoop.hive.ql.optimizer.NestedColumnFieldPruningUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; @@ -135,7 +140,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { ((StructTypeInfo) colType).getAllStructFieldNames(), ((StructTypeInfo) colType).getAllStructFieldTypeInfos() ); - + Type[] typesArray = groupFields.toArray(new Type[0]); return Types.buildGroup(fieldType.getRepetition()) .addFields(typesArray) @@ -164,7 +169,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { } /** - * Searchs column names by name on a given Parquet message schema, and returns its projected + * Searches column names by name on a given Parquet message schema, and returns its projected * Parquet schema types. * * @param schema Message type schema where to search for column names. @@ -182,7 +187,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { } /** - * Searchs column names by index on a given Parquet file schema, and returns its corresponded + * Searches column names by indexes on a given Parquet file schema, and returns its corresponded * Parquet schema types. * * @param schema Message schema where to search for column names. @@ -200,6 +205,55 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { } else { //prefixing with '_mask_' to ensure no conflict with named //columns in the file schema + schemaTypes.add( + Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); + } + } + } + + return new MessageType(schema.getName(), schemaTypes); + } + + /** + * Generate the projected schema from colIndexes and nested column paths. If the column is + * contained by colIndex, it will be added directly, otherwise it will build a group type which + * contains all required sub types using nestedColumnPaths. + * @param schema original schema + * @param colNames + * @param colIndexes the index of needed columns + * @param nestedColumnPaths the paths for nested columns + * @return + */ + public static MessageType getProjectedSchema( + MessageType schema, + List<String> colNames, + List<Integer> colIndexes, + List<String> nestedColumnPaths) { + List<Type> schemaTypes = new ArrayList<Type>(); + + Map<String, FieldNode> prunedCols = getPrunedNestedColumns(nestedColumnPaths); + for (Integer i : colIndexes) { + if (i < colNames.size()) { + if (i < schema.getFieldCount()) { + Type t = schema.getType(i); + if (!prunedCols.containsKey(t.getName())) { + schemaTypes.add(schema.getType(i)); + } else { + if (t.isPrimitive()) { + // For primitive type, add directly. + schemaTypes.add(t); + } else { + // For group type, we need to build the projected group type with required leaves + List<Type> g = + projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(t.getName()))); + if (!g.isEmpty()) { + schemaTypes.addAll(g); + } + } + } + } else { + //prefixing with '_mask_' to ensure no conflict with named + //columns in the file schema schemaTypes.add(Types.optional(PrimitiveTypeName.BINARY).named("_mask_" + colNames.get(i))); } } @@ -209,6 +263,79 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { } /** + * Return the columns which contains required nested attribute level + * e.g. + * Given struct a <x:int, y:int> and a is required while y is not, so the method will return a + * who contains the attribute x + * + * @param nestedColPaths the paths for required nested attribute + * @return column list contains required nested attribute + */ + private static Map<String, FieldNode> getPrunedNestedColumns(List<String> nestedColPaths) { + Map<String, FieldNode> resMap = new HashMap<>(); + if (nestedColPaths.isEmpty()) { + return resMap; + } + for (String s : nestedColPaths) { + String c = StringUtils.split(s, '.')[0]; + if (!resMap.containsKey(c)) { + FieldNode f = NestedColumnFieldPruningUtils.addNodeByPath(null, s); + resMap.put(c, f); + } else { + resMap.put(c, NestedColumnFieldPruningUtils.addNodeByPath(resMap.get(c), s)); + } + } + return resMap; + } + + private static GroupType buildProjectedGroupType( + GroupType originalType, + List<Type> types) { + if (types == null || types.isEmpty()) { + return null; + } + return new GroupType(originalType.getRepetition(), originalType.getName(), types); + } + + private static List<Type> projectLeafTypes( + List<Type> types, + List<FieldNode> nodes) { + List<Type> res = new ArrayList<>(); + if (nodes.isEmpty()) { + return res; + } + Map<String, FieldNode> fieldMap = new HashMap<>(); + for (FieldNode n : nodes) { + fieldMap.put(n.getFieldName(), n); + } + for (Type type : types) { + String tn = type.getName(); + + if (fieldMap.containsKey(tn)) { + FieldNode f = fieldMap.get(tn); + if (f.getNodes().isEmpty()) { + // no child, no need for pruning + res.add(type); + } else { + if (type instanceof GroupType) { + GroupType groupType = type.asGroupType(); + List<Type> ts = projectLeafTypes(groupType.getFields(), f.getNodes()); + GroupType g = buildProjectedGroupType(groupType, ts); + if (g != null) { + res.add(g); + } + } else { + throw new RuntimeException( + "Primitive type " + f.getFieldName() + "should not " + "doesn't match type" + f + .toString()); + } + } + } + } + return res; + } + + /** * It creates the readContext for Parquet side with the requested schema during the init phase. * * @param context @@ -246,10 +373,11 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess)); this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + List<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration); List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { - MessageType requestedSchemaByUser = - getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList, + indexColumnsWanted, groupPaths); return new ReadContext(requestedSchemaByUser, contextMetadata); } else { return new ReadContext(tableSchema, contextMetadata); http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index 611a6b7..4364298 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; @@ -37,9 +39,15 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** * This class implements the processor context for Column Pruner. @@ -50,6 +58,11 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx { private final Map<Operator<? extends OperatorDesc>, List<String>> prunedColLists; + /** + * This map stores the pruned nested column path for each operator + */ + private final Map<Operator<? extends OperatorDesc>, List<String>> prunedNestedColLists; + private final Map<CommonJoinOperator, Map<Byte, List<String>>> joinPrunedColLists; private final Map<UnionOperator, List<Integer>> unionPrunedColLists; @@ -57,6 +70,7 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx { public ColumnPrunerProcCtx(ParseContext pctx) { this.pctx = pctx; prunedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>(); + prunedNestedColLists = new HashMap<Operator<? extends OperatorDesc>, List<String>>(); joinPrunedColLists = new HashMap<CommonJoinOperator, Map<Byte, List<String>>>(); unionPrunedColLists = new HashMap<>(); } @@ -84,6 +98,10 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx { return prunedColLists; } + public Map<Operator<? extends OperatorDesc>, List<String>> getPrunedNestedColLists() { + return prunedNestedColLists; + } + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are @@ -138,6 +156,27 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx { } /** + * Get the path to the root column for the nested column attribute + * + * @param curOp current operator + * @return the nested column paths for current operator and its child operator + */ + public List<String> genNestedColPaths(Operator<? extends OperatorDesc> curOp) { + if (curOp.getChildOperators() == null) { + return null; + } + Set<String> groupPathsList = new HashSet<>(); + + for (Operator<? extends OperatorDesc> child : curOp.getChildOperators()) { + if (prunedNestedColLists.containsKey(child)) { + groupPathsList.addAll(prunedNestedColLists.get(child)); + } + } + + return new ArrayList<>(groupPathsList); + } + + /** * Creates the list of internal column names(these names are used in the * RowResolver and are different from the external column names) that are * needed in the subtree. These columns eventually have to be selected from @@ -239,6 +278,69 @@ public class ColumnPrunerProcCtx implements NodeProcessorCtx { } /** + * Creates the list of internal group paths for select * expressions. + * + * @param op The select operator. + * @param paths The list of nested column paths returned by the children of the + * select operator. + * @return List<String> of the nested column path from leaf to the root. + */ + public List<String> getSelectNestedColPathsFromChildren( + SelectOperator op, + List<String> paths) { + List<String> groups = new ArrayList<>(); + SelectDesc conf = op.getConf(); + + if (paths != null && conf.isSelStarNoCompute()) { + groups.addAll(paths); + return groups; + } + + List<ExprNodeDesc> selectDescs = conf.getColList(); + + List<String> outputColumnNames = conf.getOutputColumnNames(); + for (int i = 0; i < outputColumnNames.size(); i++) { + if (paths == null || paths.contains(outputColumnNames.get(i))) { + ExprNodeDesc desc = selectDescs.get(i); + List<String> gp = getNestedColPathByDesc(desc); + groups.addAll(gp); + } + } + + return groups; + } + + // Entry method + private List<String> getNestedColPathByDesc(ExprNodeDesc desc) { + List<String> res = new ArrayList<>(); + getNestedColsFromExprNodeDesc(desc, "", res); + return res; + } + + private void getNestedColsFromExprNodeDesc( + ExprNodeDesc desc, + String pathToRoot, + List<String> paths) { + if (desc instanceof ExprNodeColumnDesc) { + String f = ((ExprNodeColumnDesc) desc).getColumn(); + String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot; + paths.add(p); + } else if (desc instanceof ExprNodeFieldDesc) { + String f = ((ExprNodeFieldDesc) desc).getFieldName(); + String p = pathToRoot.isEmpty() ? f : f + "." + pathToRoot; + getNestedColsFromExprNodeDesc(((ExprNodeFieldDesc) desc).getDesc(), p, paths); + } else { + List<ExprNodeDesc> children = desc.getChildren(); + if (children == null || children.isEmpty()) { + return; + } + for (ExprNodeDesc c : children) { + getNestedColsFromExprNodeDesc(c, pathToRoot, paths); + } + } + } + + /** * Create the list of internal columns for select tag of LV */ public List<String> getSelectColsFromLVJoin(RowSchema rs, http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index a2a7f00..6ca4df9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFInputDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; @@ -323,9 +322,9 @@ public final class ColumnPrunerProcFactory { } else { prunedCols = referencedColumns; } - - List<ColumnInfo> newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); - + + List<ColumnInfo> newRS = prunedColumnsList(prunedCols, op.getSchema(), funcDef); + op.getSchema().setSignature(new ArrayList<ColumnInfo>(newRS)); ShapeDetails outputShape = funcDef.getStartOfChain().getInput().getOutputShape(); @@ -333,7 +332,7 @@ public final class ColumnPrunerProcFactory { return null; } - private List<ColumnInfo> buildPrunedRS(List<String> prunedCols, RowSchema oldRS) + private List<ColumnInfo> buildPrunedRS(List<String> prunedCols, RowSchema oldRS) throws SemanticException { ArrayList<ColumnInfo> sig = new ArrayList<ColumnInfo>(); HashSet<String> prunedColsSet = new HashSet<String>(prunedCols); @@ -355,7 +354,7 @@ public final class ColumnPrunerProcFactory { } return columns; } - + private RowResolver buildPrunedRR(List<String> prunedCols, RowSchema oldRS) throws SemanticException { RowResolver resolver = new RowResolver(); @@ -403,12 +402,12 @@ public final class ColumnPrunerProcFactory { } else { pDef.getOutputShape().setRr(buildPrunedRR(prunedCols, oldRS)); } - + PTFInputDef input = pDef.getInput(); if (input instanceof PartitionedTableFunctionDef) { return prunedColumnsList(prunedCols, oldRS, (PartitionedTableFunctionDef)input); } - + ArrayList<String> inputColumns = prunedInputList(prunedCols, input); input.getOutputShape().setRr(buildPrunedRR(inputColumns, oldRS)); input.getOutputShape().setColumnNames(inputColumns); @@ -486,12 +485,15 @@ public final class ColumnPrunerProcFactory { } cols = cols == null ? new ArrayList<String>() : cols; + List nestedCols = cppCtx.genNestedColPaths((Operator<? extends OperatorDesc>) nd); - cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd, - cols); + cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd, cols); + cppCtx.getPrunedNestedColLists().put((Operator<? extends OperatorDesc>) nd, nestedCols); RowSchema inputRS = scanOp.getSchema(); setupNeededColumns(scanOp, inputRS, cols); + scanOp.setNeededNestedColumnPaths(nestedCols); + return null; } } @@ -712,12 +714,12 @@ public final class ColumnPrunerProcFactory { ((SelectDesc)select.getConf()).setColList(colList); ((SelectDesc)select.getConf()).setOutputColumnNames(outputColNames); pruneOperator(ctx, select, outputColNames); - + Operator<?> udtfPath = op.getChildOperators().get(LateralViewJoinOperator.UDTF_TAG); List<String> lvFCols = new ArrayList<String>(cppCtx.getPrunedColLists().get(udtfPath)); lvFCols = Utilities.mergeUniqElems(lvFCols, outputColNames); pruneOperator(ctx, op, lvFCols); - + return null; } } @@ -772,7 +774,7 @@ public final class ColumnPrunerProcFactory { // and return the ones which have a marked column cppCtx.getPrunedColLists().put(op, cppCtx.getSelectColsFromChildren(op, cols)); - + cppCtx.getPrunedNestedColLists().put(op, cppCtx.getSelectNestedColPathsFromChildren(op, cols)); if (cols == null || conf.isSelStarNoCompute()) { return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java new file mode 100644 index 0000000..1579797 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/FieldNode.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class FieldNode { + private String fieldName; + private List<FieldNode> nodes; + + public FieldNode(String fieldName) { + this.fieldName = fieldName; + nodes = new ArrayList<>(); + } + + public String getFieldName() { + return fieldName; + } + + public void addFieldNodes(FieldNode... nodes) { + if (nodes != null || nodes.length > 0) { + this.nodes.addAll(Arrays.asList(nodes)); + } + } + + public List<FieldNode> getNodes() { + return nodes; + } + + @Override + public String toString() { + String res = fieldName; + if (nodes.size() > 0) { + res += "["; + for (int i = 0; i < nodes.size(); i++) { + if (i == nodes.size() - 1) { + res += nodes.get(i).toString(); + } else { + res += nodes.get(i).toString() + ","; + } + } + res += "]"; + } + return res; + } + + @Override + public boolean equals(Object object) { + FieldNode fieldNode = (FieldNode) object; + if (!fieldName.equals(fieldNode.getFieldName()) || fieldNode.getNodes().size() != fieldNode + .getNodes().size()) { + return false; + } + + for (int i = 0; i < fieldNode.getNodes().size(); i++) { + if (fieldNode.getNodes().get(i).equals(nodes.get(i))) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java new file mode 100644 index 0000000..0f5629d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NestedColumnFieldPruningUtils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.Arrays; +import java.util.List; + +public class NestedColumnFieldPruningUtils { + + /** + * Add a leaf node to the field tree if the specified path is not contained by + * current tree specified by the passed parameter field node. + * + * @param fieldNode the root of the column tree + * @param path contains the path from root to leaf + * @return the root of the newly built tree + */ + public static FieldNode addNodeByPath( + FieldNode fieldNode, + String path) { + if (path == null || path.isEmpty()) { + return fieldNode; + } + boolean found = false; + int index = 0; + String[] ps = path.split("\\."); + FieldNode c = fieldNode; + if (fieldNode != null) { + List<FieldNode> currentList = Arrays.asList(c); + while (index < ps.length) { + found = false; + for (FieldNode n : currentList) { + if (n.getFieldName().equals(ps[index])) { + found = true; + // If the matched field is leaf which means all leaves are required, not need to go + // deeper. + if (n.getNodes().isEmpty()) { + return fieldNode; + } + c = n; + currentList = c.getNodes(); + break; + } + } + if (found) { + index++; + } else { + break; + } + } + } + + if (!found) { + while (index < ps.length) { + FieldNode n = new FieldNode(ps[index]); + if (fieldNode == null) { + // rebuild the tree since original is empty + fieldNode = n; + } + if (c != null) { + c.addFieldNodes(n); + } + c = n; + index++; + } + } else { + if (index == ps.length) { + // Consolidation since all leaves are required. + c.getNodes().clear(); + } + } + return fieldNode; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ebe613e..2cb0935 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.parse.TableSample; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.serde.serdeConstants; - /** * Table Scan Descriptor Currently, data is only read from a base source as part * of map-reduce framework. So, nothing is stored in the descriptor. But, more @@ -81,6 +80,7 @@ public class TableScanDesc extends AbstractOperatorDesc { // SELECT count(*) FROM t). private List<Integer> neededColumnIDs; private List<String> neededColumns; + private List<String> neededNestedColumnPaths; // all column names referenced including virtual columns. used in ColumnAccessAnalyzer private transient List<String> referencedColumns; @@ -202,6 +202,14 @@ public class TableScanDesc extends AbstractOperatorDesc { return neededColumnIDs; } + public List<String> getNeededNestedColumnPaths() { + return neededNestedColumnPaths; + } + + public void setNeededNestedColumnPaths(List<String> neededNestedColumnPaths) { + this.neededNestedColumnPaths = neededNestedColumnPaths; + } + public void setNeededColumns(List<String> neededColumns) { this.neededColumns = neededColumns; } http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java new file mode 100644 index 0000000..510d256 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/HiveParquetSchemaTestUtils.java @@ -0,0 +1,75 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HiveParquetSchemaTestUtils { + + public static List<String> createHiveColumnsFrom(final String columnNamesStr) { + List<String> columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList<String>(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + public static List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) { + List<TypeInfo> columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList<TypeInfo>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } + + public static void testConversion( + final String columnNamesStr, + final String columnsTypeStr, + final String actualSchema) throws Exception { + final List<String> columnNames = createHiveColumnsFrom(columnNamesStr); + final List<TypeInfo> columnTypes = createHiveTypeInfoFrom(columnsTypeStr); + final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); + final MessageType expectedMT = MessageTypeParser.parseMessageType(actualSchema); + assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + actualSchema, + expectedMT, messageTypeFound); + + // Required to check the original types manually as PrimitiveType.equals does not care about it + List<Type> expectedFields = expectedMT.getFields(); + List<Type> actualFields = messageTypeFound.getFields(); + for (int i = 0, n = expectedFields.size(); i < n; ++i) { + OriginalType exp = expectedFields.get(i).getOriginalType(); + OriginalType act = actualFields.get(i).getOriginalType(); + assertEquals("Original types of the field do not match", exp, act); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java index 256031e..137c764 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java @@ -13,63 +13,22 @@ */ package org.apache.hadoop.hive.ql.io.parquet; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveColumnsFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.createHiveTypeInfoFrom; +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; import static org.junit.Assert.assertEquals; -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; import org.junit.Test; -public class TestHiveSchemaConverter { - - private List<String> createHiveColumnsFrom(final String columnNamesStr) { - List<String> columnNames; - if (columnNamesStr.length() == 0) { - columnNames = new ArrayList<String>(); - } else { - columnNames = Arrays.asList(columnNamesStr.split(",")); - } - - return columnNames; - } - - private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) { - List<TypeInfo> columnTypes; - - if (columnsTypeStr.length() == 0) { - columnTypes = new ArrayList<TypeInfo>(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); - } - return columnTypes; - } - - private void testConversion(final String columnNamesStr, final String columnsTypeStr, final String expectedSchema) throws Exception { - final List<String> columnNames = createHiveColumnsFrom(columnNamesStr); - final List<TypeInfo> columnTypes = createHiveTypeInfoFrom(columnsTypeStr); - final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes); - final MessageType expectedMT = MessageTypeParser.parseMessageType(expectedSchema); - assertEquals("converting " + columnNamesStr + ": " + columnsTypeStr + " to " + expectedSchema, expectedMT, messageTypeFound); - - // Required to check the original types manually as PrimitiveType.equals does not care about it - List<Type> expectedFields = expectedMT.getFields(); - List<Type> actualFields = messageTypeFound.getFields(); - for (int i = 0, n = expectedFields.size(); i < n; ++i) { - OriginalType exp = expectedFields.get(i).getOriginalType(); - OriginalType act = actualFields.get(i).getOriginalType(); - assertEquals("Original types of the field do not match", exp, act); - } - } +public class TestHiveSchemaConverter { @Test public void testSimpleType() throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java new file mode 100644 index 0000000..b3aaca6 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestDataWritableReadSupport.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.read; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.hadoop.hive.ql.io.parquet.HiveParquetSchemaTestUtils.testConversion; + +public class TestDataWritableReadSupport { + @Test + public void testGetProjectedSchema1() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " optional boolean c;\n" + + " optional fixed_len_byte_array(3) d (DECIMAL(5,2));\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct<a:int>", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a")).toString()); + } + + @Test + public void testGetProjectedSchema2() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + "}\n"); + + testConversion("structCol", "struct<a:int,b:double>", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.a", "structCol.b")).toString()); + } + + @Test + public void testGetProjectedSchema3() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + " }\n" + + " optional boolean c;\n" + + "}\n"); + + testConversion("structCol,c", "struct<b:double>,boolean", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol", "c"), Arrays.asList(0, 1), + Arrays.asList("structCol.b", "c")).toString()); + } + + @Test + public void testGetProjectedSchema4() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional group subStructCol {\n" + + " optional int64 b;\n" + + " optional boolean c;\n" + + " }\n" + + " }\n" + + " optional boolean d;\n" + + "}\n"); + + testConversion("structCol", "struct<subStructCol:struct<b:bigint>>", DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), + Arrays.asList("structCol.subStructCol.b")).toString()); + } + + @Test + public void testGetProjectedSchema5() throws Exception { + MessageType originalMsg = MessageTypeParser.parseMessageType( + "message hive_schema {\n" + + " optional group structCol {\n" + + " optional int32 a;\n" + + " optional group subStructCol {\n" + + " optional int64 b;\n" + + " optional boolean c;\n" + + " }\n" + + " }\n" + + " optional boolean d;\n" + + "}\n"); + + testConversion("structCol", "struct<subStructCol:struct<b:bigint,c:boolean>>", + DataWritableReadSupport + .getProjectedSchema(originalMsg, Arrays.asList("structCol"), Arrays.asList(0), Arrays + .asList("structCol.subStructCol", "structCol.subStructCol.b", + "structCol.subStructCol.c")).toString()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java new file mode 100644 index 0000000..dfcd154 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestColumnPrunerProcCtx.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPower; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestColumnPrunerProcCtx { + // struct<a:boolean,b:double> + private static TypeInfo col1Type; + // double + private static TypeInfo col2Type; + // struct<col1:struct<a:boolean,b:double>,col2:double> + private static TypeInfo col3Type; + + @BeforeClass + public static void setup(){ + List<String> ns = new ArrayList<>(); + ns.add("a"); + ns.add("b"); + List<TypeInfo> tis = new ArrayList<>(); + TypeInfo aType = TypeInfoFactory.booleanTypeInfo; + TypeInfo bType = TypeInfoFactory.doubleTypeInfo; + tis.add(aType); + tis.add(bType); + col1Type = TypeInfoFactory.getStructTypeInfo(ns, tis); + col2Type = TypeInfoFactory.doubleTypeInfo; + + List<String> names = new ArrayList<>(); + names.add("col1"); + names.add("col2"); + + List<TypeInfo> typeInfos = new ArrayList<>(); + typeInfos.add(col1Type); + typeInfos.add(col2Type); + col3Type = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + } + + // Test select root.col1.a from root:struct<col1:struct<a:boolean,b:double>,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren1() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.booleanTypeInfo, col1, "a", false); + final List<String> paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.a" }, groups.toArray(new String[groups.size()])); + } + + // Test select root.col1 from root:struct<col1:struct<a:boolean,b:double>,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren2() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + final List<String> paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1" }, groups.toArray(new String[groups.size()])); + } + + // Test select root.col2 from root:struct<col1:struct<a:boolean,b:double>,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren3() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(col1Type, colDesc, "col2", false); + final List<String> paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col2" }, groups.toArray(new String[groups.size()])); + } + + // Test select root from root:struct<col1:struct<a:boolean,b:double>,col2:double> + @Test + public void testGetSelectNestedColPathsFromChildren4() { + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + final List<String> paths = Arrays.asList("_col0"); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(colDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root" }, groups.toArray(new String[groups.size()])); + } + + // Test select named_struct from named_struct:struct<a:boolean,b:double> + @Test + public void testGetSelectNestedColPathsFromChildren5(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeConstantDesc constADesc = new ExprNodeConstantDesc(TypeInfoFactory.booleanTypeInfo, "a"); + ExprNodeConstantDesc constBDesc = new ExprNodeConstantDesc(TypeInfoFactory.doubleTypeInfo, "b"); + List<ExprNodeDesc> list = new ArrayList<>(); + list.add(constADesc); + list.add(constBDesc); + GenericUDF udf = mock(GenericUDF.class); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(col1Type, udf, "named_struct", list); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, funcDesc, "foo", + false); + + final List<String> paths = Arrays.asList("_col0"); + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(fieldDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + // Return empty result since only constant Desc exists + assertEquals(0, groups.size()); + } + + // Test select abs(root.col1.b) from table test(root struct<col1:struct<a:boolean,b:double>, + // col2:double>); + @Test + public void testGetSelectNestedColPathsFromChildren6(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc = new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, col1, "b", + false); + final List<String> paths = Arrays.asList("_col0"); + + GenericUDF udf = mock(GenericUDFBridge.class); + + List<ExprNodeDesc> list = new ArrayList<>(); + list.add(fieldDesc); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(TypeInfoFactory.binaryTypeInfo, udf, "abs", + list); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(funcDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.b" }, groups.toArray(new String[groups.size()])); + } + + // Test select pow(root.col1.b, root.col2) from table test(root + // struct<col1:struct<a:boolean,b:double>, col2:double>); + @Test + public void testGetSelectNestedColPathsFromChildren7(){ + ColumnPrunerProcCtx ctx = new ColumnPrunerProcCtx(null); + + ExprNodeDesc colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col1 = new ExprNodeFieldDesc(col1Type, colDesc, "col1", false); + ExprNodeDesc fieldDesc1 = + new ExprNodeFieldDesc(TypeInfoFactory.doubleTypeInfo, col1, "b", false); + + colDesc = new ExprNodeColumnDesc(col3Type, "root", "test", false); + ExprNodeDesc col2 = new ExprNodeFieldDesc(col2Type, colDesc, "col2", false); + final List<String> paths = Arrays.asList("_col0"); + + GenericUDF udf = mock(GenericUDFPower.class); + + List<ExprNodeDesc> list = new ArrayList<>(); + list.add(fieldDesc1); + list.add(col2); + ExprNodeDesc funcDesc = new ExprNodeGenericFuncDesc(TypeInfoFactory.doubleTypeInfo, udf, "pow", + list); + + SelectOperator selectOperator = buildSelectOperator(Arrays.asList(funcDesc), paths); + List<String> groups = ctx.getSelectNestedColPathsFromChildren(selectOperator, paths); + assertEquals(new String[] { "root.col1.b", "root.col2" }, groups.toArray(new String[groups + .size()])); + } + + private SelectOperator buildSelectOperator( + List<ExprNodeDesc> colList, + List<String> outputColumnNames) { + SelectOperator selectOperator = mock(SelectOperator.class); + SelectDesc selectDesc = new SelectDesc(colList, outputColumnNames); + selectDesc.setSelStarNoCompute(false); + when(selectOperator.getConf()).thenReturn(selectDesc); + return selectOperator; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java new file mode 100644 index 0000000..df7d83e --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestNestedColumnFieldPruningUtils.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class TestNestedColumnFieldPruningUtils { + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + { "root[a]", new String[] { "root.a.b.c" }, "root[a]" }, + { "root[a[b[d,e]],c]", new String[] { "root.a.b.c" }, "root[a[b[d,e,c]],c]" }, + { "root[a[b[c]]]", new String[] { "root.a.b.c.d" }, "root[a[b[c]]]" }, + { null, new String[] { "a.b.c" }, "a[b[c]]" }, + { null, new String[] { "a.b", "a.c" }, "a[b,c]" }, + { "a[b]", new String[] { "a.b.c" }, "a[b]" } }); + } + + @Parameterized.Parameter(value = 0) + public String origTreeExpr; + @Parameterized.Parameter(value = 1) + public String[] paths; + @Parameterized.Parameter(value = 2) + public String resTreeExpr; + + @org.junit.Test + public void testAddNodeByPath() { + FieldNode root = null; + if (origTreeExpr != null) { + root = buildTreeByExpr(origTreeExpr); + Assert.assertEquals("The original tree is built incorrect", root.toString(), origTreeExpr); + } + for (String p : paths) { + root = NestedColumnFieldPruningUtils.addNodeByPath(root, p); + } + Assert.assertEquals(resTreeExpr, root.toString()); + } + + private static boolean isSpecialChar(char element) { + return (element == '[') || (element == ']') || (element == ','); + } + + private static FieldNode buildTreeByExpr(String expr) { + int index = 0; + LinkedList<FieldNode> fieldStack = new LinkedList<>(); + while (index < expr.length()) { + int i = index; + if (isSpecialChar(expr.charAt(i))) { + if ((expr.charAt(index) == ',') || (expr.charAt(index) == ']')) { + FieldNode node = fieldStack.pop(); + FieldNode pre = fieldStack.peek(); + pre.addFieldNodes(node); + } + index++; + } else { + while (i < expr.length() && !isSpecialChar(expr.charAt(i))) { + i++; + } + FieldNode current = new FieldNode(expr.substring(index, i)); + fieldStack.push(current); + index = i; + } + } + return fieldStack.pop(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/41fbe7bb/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java index 0c7ac30..3978a15 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java @@ -37,9 +37,18 @@ public final class ColumnProjectionUtils { public static final Logger LOG = LoggerFactory.getLogger(ColumnProjectionUtils.class); public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + /** + * the nested column path is the string from the root to the leaf + * e.g. + * c:struct<a:string,b:string> + * the column a's path is c.a and b's path is c.b + */ + public static final String READ_NESTED_COLUMN_PATH_CONF_STR = + "hive.io.file.readNestedColumn.paths"; public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; + private static final String READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT = ""; private static final boolean READ_ALL_COLUMNS_DEFAULT = true; private static final Joiner CSV_JOINER = Joiner.on(",").skipNulls(); @@ -113,6 +122,30 @@ public final class ColumnProjectionUtils { } /** + * Appends read nested column's paths. Once a read nested column path + * is included in the list, a underlying record reader of a columnar file format + * (e.g. Parquet and ORC) can know what columns are needed. + */ + public static void appendNestedColumnPaths( + Configuration conf, + List<String> paths) { + if (paths == null || paths.isEmpty()) { + return; + } + String pathsStr = StringUtils.join(StringUtils.COMMA_STR, + paths.toArray(new String[paths.size()])); + String old = conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, null); + String newConfStr = pathsStr; + if (old != null && !old.isEmpty()) { + newConfStr = newConfStr + StringUtils.COMMA_STR + old; + } + setReadNestedColumnPathConf(conf, newConfStr); + // Set READ_ALL_COLUMNS to false + conf.setBoolean(READ_ALL_COLUMNS, false); + } + + + /** * This method appends read column information to configuration to use for PPD. It is * currently called with information from TSOP. Names come from TSOP input RowSchema, and * IDs are the indexes inside the schema (which PPD assumes correspond to indexes inside the @@ -122,13 +155,14 @@ public final class ColumnProjectionUtils { * @param names Column names. */ public static void appendReadColumns( - Configuration conf, List<Integer> ids, List<String> names) { + Configuration conf, List<Integer> ids, List<String> names, List<String> groupPaths) { if (ids.size() != names.size()) { LOG.warn("Read column counts do not match: " + ids.size() + " ids, " + names.size() + " names"); } appendReadColumns(conf, ids); appendReadColumnNames(conf, names); + appendNestedColumnPaths(conf, groupPaths); } public static void appendReadColumns( @@ -160,6 +194,20 @@ public final class ColumnProjectionUtils { return result; } + public static List<String> getNestedColumnPaths(Configuration conf) { + String skips = + conf.get(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + String[] list = StringUtils.split(skips); + List<String> result = new ArrayList<>(list.length); + for (String element : list) { + // it may contain duplicates, remove duplicates + if (!result.contains(element)) { + result.add(element); + } + } + return result; + } + public static String[] getReadColumnNames(Configuration conf) { String colNames = conf.get(READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); if (colNames != null && !colNames.isEmpty()) { @@ -176,6 +224,16 @@ public final class ColumnProjectionUtils { } } + private static void setReadNestedColumnPathConf( + Configuration conf, + String nestedColumnPaths) { + if (nestedColumnPaths.trim().isEmpty()) { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, READ_NESTED_COLUMN_PATH_CONF_STR_DEFAULT); + } else { + conf.set(READ_NESTED_COLUMN_PATH_CONF_STR, nestedColumnPaths); + } + } + private static void appendReadColumnNames(Configuration conf, List<String> cols) { String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); StringBuilder result = new StringBuilder(old);
