HIVE-13617 : LLAP: support non-vectorized execution in IO (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b772fed0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b772fed0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b772fed0 Branch: refs/heads/master Commit: b772fed09ec39e883ff034b806038fd49812d398 Parents: af378c0 Author: Sergey Shelukhin <[email protected]> Authored: Wed Jun 22 18:04:02 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Jun 22 18:08:20 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../test/resources/testconfiguration.properties | 3 +- .../hive/llap/io/api/impl/LlapInputFormat.java | 121 +- .../apache/hadoop/hive/ql/exec/MapOperator.java | 7 +- .../hive/ql/io/BatchToRowInputFormat.java | 31 + .../hadoop/hive/ql/io/BatchToRowReader.java | 623 +++++++++ .../hadoop/hive/ql/io/HiveInputFormat.java | 17 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 14 +- .../hive/ql/io/orc/OrcOiBatchToRowReader.java | 71 + .../hive/ql/optimizer/GenMapRedUtils.java | 6 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 20 +- .../queries/clientpositive/orc_llap_nonvector.q | 42 + .../llap/dynamic_partition_pruning.q.out | 6 +- .../llap/hybridgrace_hashjoin_1.q.out | 24 +- .../results/clientpositive/llap/llap_udf.q.out | 4 +- .../clientpositive/llap/llapdecider.q.out | 24 +- .../clientpositive/llap/mapjoin_decimal.q.out | 4 +- .../llap/orc_llap_nonvector.q.out | 1310 ++++++++++++++++++ .../clientpositive/llap/orc_ppd_basic.q.out | 28 +- .../llap/tez_bmj_schema_evolution.q.out | 2 +- .../llap/tez_dynpart_hashjoin_1.q.out | 24 +- .../llap/tez_dynpart_hashjoin_2.q.out | 6 +- .../llap/tez_union_group_by.q.out | 10 +- .../llap/tez_vector_dynpart_hashjoin_1.q.out | 12 +- .../llap/tez_vector_dynpart_hashjoin_2.q.out | 2 +- .../vectorized_dynamic_partition_pruning.q.out | 4 +- .../clientpositive/orc_llap_nonvector.q.out | 1302 +++++++++++++++++ .../clientpositive/tez/llapdecider.q.out | 24 +- 28 files changed, 3620 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3783dc4..08ee23f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2628,7 +2628,11 @@ public class HiveConf extends Configuration { "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), + LLAP_IO_NONVECTOR_WRAPPER_ENABLED("hive.llap.io.nonvector.wrapper.enabled", true, + "Whether the LLAP IO layer is enabled for non-vectorized queries that read inputs\n" + + "that can be vectorized"), LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache", new StringSet("cache", "allocator", "none"), "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 2b40cd9..2f04881 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -524,7 +524,8 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\ tez_union_multiinsert.q -minillap.query.files=llap_udf.q +minillap.query.files=llap_udf.q,\ + orc_llap_nonvector.q encrypted.query.files=encryption_join_unencrypted_tbl.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 298f788..b945de3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -19,19 +19,33 @@ package org.apache.hadoop.hive.llap.io.api.impl; +import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + +import com.google.common.base.Joiner; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.ConsumerFeedback; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -40,11 +54,17 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOiBatchToRowReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; @@ -57,16 +77,12 @@ import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; - public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, AvoidSplitCombination { + private static final String NONVECTOR_SETTING_MESSAGE = "disable " + + ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED.varname + " to work around this error"; + @SuppressWarnings("rawtypes") private final InputFormat sourceInputFormat; private final AvoidSplitCombination sourceASC; @@ -95,29 +111,35 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB if (split instanceof LlapAwareSplit) { useLlapIo = ((LlapAwareSplit)split).canUseLlapIo(); } + boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job); if (!useLlapIo) { LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split); - @SuppressWarnings("unchecked") - RecordReader<NullWritable, VectorizedRowBatch> rr = - sourceInputFormat.getRecordReader(split, job, reporter); - return rr; - } - boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(job); - if (!isVectorMode) { - LlapIoImpl.LOG.error("No LLAP IO in non-vectorized mode"); - throw new UnsupportedOperationException("No LLAP IO in non-vectorized mode"); + return sourceInputFormat.getRecordReader(split, job, reporter); } FileSplit fileSplit = (FileSplit)split; reporter.setStatus(fileSplit.toString()); try { List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); - return new LlapRecordReader(job, fileSplit, includedCols, hostName); + LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName); + if (isVectorized) return rr; + if (sourceInputFormat instanceof BatchToRowInputFormat) { + return bogusCast(((BatchToRowInputFormat)sourceInputFormat).getWrapper( + rr, rr.getVectorizedRowBatchCtx(), includedCols)); + } + return sourceInputFormat.getRecordReader(split, job, reporter); } catch (Exception ex) { throw new IOException(ex); } } + // Returning either a vectorized or non-vectorized reader from the same call requires breaking + // generics... this is how vectorization currently works. + @SuppressWarnings("unchecked") + private static <A, B, C, D> RecordReader<A, B> bogusCast(RecordReader<C, D> rr) { + return (RecordReader<A, B>)rr; + } + @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { return sourceInputFormat.getSplits(job, numSplits); @@ -146,9 +168,8 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB private final QueryFragmentCounters counters; private long firstReturnTime; - public LlapRecordReader( - JobConf job, FileSplit split, List<Integer> includedCols, String hostName) - throws IOException { + public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols, + String hostName) throws IOException, HiveException { this.split = split; this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); @@ -171,7 +192,8 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName); MapWork mapWork = Utilities.getMapWork(job); - rbCtx = mapWork.getVectorizedRowBatchCtx(); + VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx(); + rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork); columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(job); @@ -232,6 +254,9 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB return true; } + public VectorizedRowBatchCtx getVectorizedRowBatchCtx() { + return rbCtx; + } private final class UncaughtErrorHandler implements FutureCallback<Void> { @Override @@ -371,4 +396,54 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf); } + + private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException { + // This is based on Vectorizer code, minus the validation. + + // Add all non-virtual columns from the TableScan operator. + RowSchema rowSchema = findTsOp(mapWork).getSchema(); + final List<String> colNames = new ArrayList<String>(rowSchema.getSignature().size()); + final List<TypeInfo> colTypes = new ArrayList<TypeInfo>(rowSchema.getSignature().size()); + for (ColumnInfo c : rowSchema.getSignature()) { + String columnName = c.getInternalName(); + if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) continue; + colNames.add(columnName); + colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName())); + } + + // Determine the partition columns using the first partition descriptor. + // Note - like vectorizer, this assumes partition columns go after data columns. + int partitionColumnCount = 0; + Iterator<String> paths = mapWork.getPathToAliases().keySet().iterator(); + if (paths.hasNext()) { + PartitionDesc partDesc = mapWork.getPathToPartitionInfo().get(paths.next()); + if (partDesc != null) { + LinkedHashMap<String, String> partSpec = partDesc.getPartSpec(); + if (partSpec != null && partSpec.isEmpty()) { + partitionColumnCount = partSpec.size(); + } + } + } + return new VectorizedRowBatchCtx(colNames.toArray(new String[colNames.size()]), + colTypes.toArray(new TypeInfo[colTypes.size()]), partitionColumnCount, new String[0]); + } + + static TableScanOperator findTsOp(MapWork mapWork) throws HiveException { + if (mapWork.getAliasToWork() == null) { + throw new HiveException("Unexpected - aliasToWork is missing; " + NONVECTOR_SETTING_MESSAGE); + } + Iterator<Operator<?>> ops = mapWork.getAliasToWork().values().iterator(); + TableScanOperator tableScanOperator = null; + while (ops.hasNext()) { + Operator<?> op = ops.next(); + if (op instanceof TableScanOperator) { + if (tableScanOperator != null) { + throw new HiveException("Unexpected - more than one TSOP; " + NONVECTOR_SETTING_MESSAGE); + } + tableScanOperator = (TableScanOperator)op; + } + } + return tableScanOperator; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index afe5ee2..fabfbc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -491,7 +491,12 @@ public class MapOperator extends AbstractMapOperator { } } catch (Exception e) { // TODO: policy on deserialization errors - String message = toErrorMessage(value, row, current.rowObjectInspector); + String message = null; + try { + message = toErrorMessage(value, row, current.rowObjectInspector); + } catch (Throwable t) { + message = "[" + row + ", " + value + "]: cannot get error message " + t.getMessage(); + } if (row == null) { deserialize_error_count.set(deserialize_error_count.get() + 1); throw new HiveException("Hive Runtime Error while processing writable " + message, e); http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java new file mode 100644 index 0000000..f958db5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowInputFormat.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.io.NullWritable; + +import java.util.List; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.mapred.RecordReader; + +public interface BatchToRowInputFormat { + BatchToRowReader<?, ?> getWrapper(RecordReader<NullWritable, VectorizedRowBatch> vrr, + VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols); +} http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java new file mode 100644 index 0000000..f350302 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BatchToRowReader.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + + +import com.google.common.collect.Lists; + +import org.apache.hadoop.hive.llap.DebugUtils; + +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveCharWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A record reader wrapper that converts VRB reader into an OI-based reader. + * Due to the fact that changing table OIs in the plan after compilation is nearly impossible, + * this is made an abstract class where type-specific implementations can plug in certain details, + * so that the data produced after wrapping a vectorized reader would conform to the original OIs. + */ +public abstract class BatchToRowReader<StructType, UnionType> + implements RecordReader<NullWritable, Object> { + protected static final Logger LOG = LoggerFactory.getLogger(BatchToRowReader.class); + + private final NullWritable key; + private final VectorizedRowBatch batch; + private final RecordReader<NullWritable, VectorizedRowBatch> vrbReader; + + private final List<TypeInfo> schema; + private final boolean[] included; + private int rowInBatch = 0; + + public BatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> vrbReader, + VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) { + this.vrbReader = vrbReader; + this.key = vrbReader.createKey(); + this.batch = vrbReader.createValue(); + this.schema = Lists.<TypeInfo>newArrayList(vrbCtx.getRowColumnTypeInfos()); + // TODO: does this include partition columns? + boolean[] included = new boolean[schema.size()]; + if (includedCols != null) { + for (int colIx : includedCols) { + included[colIx] = true; + } + } else { + Arrays.fill(included, true); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Including the columns " + DebugUtils.toString(included)); + } + this.included = included; + } + + protected abstract StructType createStructObject(Object previous, List<TypeInfo> childrenTypes); + protected abstract void setStructCol(StructType structObj, int i, Object value); + protected abstract Object getStructCol(StructType structObj, int i); + protected abstract UnionType createUnionObject(List<TypeInfo> childrenTypes, Object previous); + protected abstract void setUnion(UnionType unionObj, byte tag, Object object); + protected abstract Object getUnionField(UnionType unionObj); + + @Override + public NullWritable createKey() { + return key; + } + + @Override + public Object createValue() { + return createStructObject(null, schema); + } + + @Override + public long getPos() throws IOException { + return -1; + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + public boolean next(NullWritable key, Object previous) throws IOException { + if (!ensureBatch()) { + return false; + } + @SuppressWarnings("unchecked") + StructType value = (StructType)previous; + for (int i = 0; i < schema.size(); ++i) { + if (!included[i]) continue; // TODO: shortcut for last col below length? + try { + setStructCol(value, i, + nextValue(batch.cols[i], rowInBatch, schema.get(i), getStructCol(value, i))); + } catch (Throwable t) { + LOG.error("Error at row " + rowInBatch + "/" + batch.size + ", column " + i + + "/" + schema.size() + " " + batch.cols[i], t); + throw (t instanceof IOException) ? (IOException)t : new IOException(t); + } + } + ++rowInBatch; + return true; + } + + /** + * If the current batch is empty, get a new one. + * @return true if we have rows available. + */ + private boolean ensureBatch() throws IOException { + if (rowInBatch >= batch.size) { + rowInBatch = 0; + return vrbReader.next(key, batch) && batch.size > 0; + } + return true; + } + + + @Override + public void close() throws IOException { + vrbReader.close(); + batch.cols = null; + } + + /* Routines for stubbing into Writables */ + + public static BooleanWritable nextBoolean(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BooleanWritable result; + if (previous == null || previous.getClass() != BooleanWritable.class) { + result = new BooleanWritable(); + } else { + result = (BooleanWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row] != 0); + return result; + } else { + return null; + } + } + + public static ByteWritable nextByte(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ByteWritable result; + if (previous == null || previous.getClass() != ByteWritable.class) { + result = new ByteWritable(); + } else { + result = (ByteWritable) previous; + } + result.set((byte) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static ShortWritable nextShort(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ShortWritable result; + if (previous == null || previous.getClass() != ShortWritable.class) { + result = new ShortWritable(); + } else { + result = (ShortWritable) previous; + } + result.set((short) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static IntWritable nextInt(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + IntWritable result; + if (previous == null || previous.getClass() != IntWritable.class) { + result = new IntWritable(); + } else { + result = (IntWritable) previous; + } + result.set((int) ((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static LongWritable nextLong(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + LongWritable result; + if (previous == null || previous.getClass() != LongWritable.class) { + result = new LongWritable(); + } else { + result = (LongWritable) previous; + } + result.set(((LongColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static FloatWritable nextFloat(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + FloatWritable result; + if (previous == null || previous.getClass() != FloatWritable.class) { + result = new FloatWritable(); + } else { + result = (FloatWritable) previous; + } + result.set((float) ((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static DoubleWritable nextDouble(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DoubleWritable result; + if (previous == null || previous.getClass() != DoubleWritable.class) { + result = new DoubleWritable(); + } else { + result = (DoubleWritable) previous; + } + result.set(((DoubleColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static Text nextString(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + Text result; + if (previous == null || previous.getClass() != Text.class) { + result = new Text(); + } else { + result = (Text) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + public static HiveCharWritable nextChar(ColumnVector vector, + int row, + int size, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + HiveCharWritable result; + if (previous == null || previous.getClass() != HiveCharWritable.class) { + result = new HiveCharWritable(); + } else { + result = (HiveCharWritable) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.toString(row), size); + return result; + } else { + return null; + } + } + + public static HiveVarcharWritable nextVarchar( + ColumnVector vector, int row, int size, Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + HiveVarcharWritable result; + if (previous == null || previous.getClass() != HiveVarcharWritable.class) { + result = new HiveVarcharWritable(); + } else { + result = (HiveVarcharWritable) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.toString(row), size); + return result; + } else { + return null; + } + } + + public static BytesWritable nextBinary(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + BytesWritable result; + if (previous == null || previous.getClass() != BytesWritable.class) { + result = new BytesWritable(); + } else { + result = (BytesWritable) previous; + } + BytesColumnVector bytes = (BytesColumnVector) vector; + result.set(bytes.vector[row], bytes.start[row], bytes.length[row]); + return result; + } else { + return null; + } + } + + public static HiveDecimalWritable nextDecimal(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + HiveDecimalWritable result; + if (previous == null || previous.getClass() != HiveDecimalWritable.class) { + result = new HiveDecimalWritable(); + } else { + result = (HiveDecimalWritable) previous; + } + result.set(((DecimalColumnVector) vector).vector[row]); + return result; + } else { + return null; + } + } + + public static DateWritable nextDate(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + DateWritable result; + if (previous == null || previous.getClass() != DateWritable.class) { + result = new DateWritable(); + } else { + result = (DateWritable) previous; + } + int date = (int) ((LongColumnVector) vector).vector[row]; + result.set(date); + return result; + } else { + return null; + } + } + + public static TimestampWritable nextTimestamp(ColumnVector vector, + int row, + Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + TimestampWritable result; + if (previous == null || previous.getClass() != TimestampWritable.class) { + result = new TimestampWritable(); + } else { + result = (TimestampWritable) previous; + } + TimestampColumnVector tcv = (TimestampColumnVector) vector; + result.setInternal(tcv.time[row], tcv.nanos[row]); + return result; + } else { + return null; + } + } + + public StructType nextStruct( + ColumnVector vector, int row, StructTypeInfo schema, Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + List<TypeInfo> childrenTypes = schema.getAllStructFieldTypeInfos(); + StructType result = createStructObject(previous, childrenTypes); + StructColumnVector struct = (StructColumnVector) vector; + for (int f = 0; f < childrenTypes.size(); ++f) { + setStructCol(result, f, nextValue(struct.fields[f], row, + childrenTypes.get(f), getStructCol(result, f))); + } + return result; + } else { + return null; + } + } + + private UnionType nextUnion( + ColumnVector vector, int row, UnionTypeInfo schema, Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + List<TypeInfo> childrenTypes = schema.getAllUnionObjectTypeInfos(); + UnionType result = createUnionObject(childrenTypes, previous); + UnionColumnVector union = (UnionColumnVector) vector; + byte tag = (byte) union.tags[row]; + setUnion(result, tag, nextValue(union.fields[tag], row, childrenTypes.get(tag), + getUnionField(result))); + return result; + } else { + return null; + } + } + + private ArrayList<Object> nextList( + ColumnVector vector, int row, ListTypeInfo schema, Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + ArrayList<Object> result; + if (previous == null || previous.getClass() != ArrayList.class) { + result = new ArrayList<>(); + } else { + result = (ArrayList<Object>) previous; + } + ListColumnVector list = (ListColumnVector) vector; + int length = (int) list.lengths[row]; + int offset = (int) list.offsets[row]; + result.ensureCapacity(length); + int oldLength = result.size(); + int idx = 0; + TypeInfo childType = schema.getListElementTypeInfo(); + while (idx < length && idx < oldLength) { + result.set(idx, nextValue(list.child, offset + idx, childType, + result.get(idx))); + idx += 1; + } + if (length < oldLength) { + for(int i= oldLength - 1; i >= length; --i) { + result.remove(i); + } + } else if (oldLength < length) { + while (idx < length) { + result.add(nextValue(list.child, offset + idx, childType, null)); + idx += 1; + } + } + return result; + } else { + return null; + } + } + + private HashMap<Object,Object> nextMap( + ColumnVector vector, int row, MapTypeInfo schema, Object previous) { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + MapColumnVector map = (MapColumnVector) vector; + int length = (int) map.lengths[row]; + int offset = (int) map.offsets[row]; + TypeInfo keyType = schema.getMapKeyTypeInfo(); + TypeInfo valueType = schema.getMapValueTypeInfo(); + HashMap<Object,Object> result; + if (previous == null || previous.getClass() != HashMap.class) { + result = new HashMap<Object,Object>(length); + } else { + result = (HashMap<Object,Object>) previous; + // I couldn't think of a good way to reuse the keys and value objects + // without even more allocations, so take the easy and safe approach. + result.clear(); + } + for(int e=0; e < length; ++e) { + result.put(nextValue(map.keys, e + offset, keyType, null), + nextValue(map.values, e + offset, valueType, null)); + } + return result; + } else { + return null; + } + } + + private Object nextValue(ColumnVector vector, int row, TypeInfo schema, Object previous) { + switch (schema.getCategory()) { + case STRUCT: + return nextStruct(vector, row, (StructTypeInfo)schema, previous); + case UNION: + return nextUnion(vector, row, (UnionTypeInfo)schema, previous); + case LIST: + return nextList(vector, row, (ListTypeInfo)schema, previous); + case MAP: + return nextMap(vector, row, (MapTypeInfo)schema, previous); + case PRIMITIVE: { + PrimitiveTypeInfo pschema = (PrimitiveTypeInfo)schema; + switch (pschema.getPrimitiveCategory()) { + case BOOLEAN: + return nextBoolean(vector, row, previous); + case BYTE: + return nextByte(vector, row, previous); + case SHORT: + return nextShort(vector, row, previous); + case INT: + return nextInt(vector, row, previous); + case LONG: + return nextLong(vector, row, previous); + case FLOAT: + return nextFloat(vector, row, previous); + case DOUBLE: + return nextDouble(vector, row, previous); + case STRING: + return nextString(vector, row, previous); + case CHAR: + return nextChar(vector, row, ((CharTypeInfo)pschema).getLength(), previous); + case VARCHAR: + return nextVarchar(vector, row, ((VarcharTypeInfo)pschema).getLength(), previous); + case BINARY: + return nextBinary(vector, row, previous); + case DECIMAL: + return nextDecimal(vector, row, previous); + case DATE: + return nextDate(vector, row, previous); + case TIMESTAMP: + return nextTimestamp(vector, row, previous); + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } + default: + throw new IllegalArgumentException("Unknown type " + schema); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index cbacc25..227a051 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.io; +import java.util.Arrays; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -207,6 +210,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface; boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf); + if (!isVectorized) { + // Pretend it's vectorized. + isVectorized = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED) + && (Utilities.getPlanPath(conf) != null); + } if (!isSupported || !isVectorized) { LOG.info("Not using llap for " + inputFormat + ": supported = " + isSupported + ", vectorized = " + isVectorized); @@ -224,12 +232,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> return castInputFormat(llapIo.getInputFormat(inputFormat)); } - public static boolean canWrapAnyForLlap(Configuration conf, MapWork mapWork) { - return Utilities.getUseVectorizedInputFileFormat(conf, mapWork); - } - public static boolean canWrapForLlap(Class<? extends InputFormat> inputFormatClass) { - return LlapWrappableInputFormatInterface.class.isAssignableFrom(inputFormatClass); + + public static boolean canWrapForLlap(Class<? extends InputFormat> clazz, boolean checkVector) { + return LlapWrappableInputFormatInterface.class.isAssignableFrom(clazz) && + (!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz)); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 070950c..69d58d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -74,10 +74,14 @@ import org.apache.hadoop.hive.metastore.Metastore.SplitInfos; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; +import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat; +import org.apache.hadoop.hive.ql.io.BatchToRowReader; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -141,7 +145,7 @@ import com.google.protobuf.CodedInputStream; public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface, SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>, - CombineHiveInputFormat.AvoidSplitCombination { + CombineHiveInputFormat.AvoidSplitCombination, BatchToRowInputFormat { static enum SplitStrategyKind { HYBRID, @@ -2188,4 +2192,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, protected ExternalFooterCachesByConf createExternalCaches() { return null; // The default ones are created in case of null; tests override this. } + + + @Override + public BatchToRowReader<?, ?> getWrapper( + org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> vrr, + VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) { + return new OrcOiBatchToRowReader(vrr, vrbCtx, includedCols); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java new file mode 100644 index 0000000..600455e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOiBatchToRowReader.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.util.List; + +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.ql.io.BatchToRowReader; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.RecordReader; + +/** BatchToRowReader that returns the rows readable by ORC IOs. */ +public class OrcOiBatchToRowReader extends BatchToRowReader<OrcStruct, OrcUnion> { + public OrcOiBatchToRowReader(RecordReader<NullWritable, VectorizedRowBatch> vrbReader, + VectorizedRowBatchCtx vrbCtx, List<Integer> includedCols) { + super(vrbReader, vrbCtx, includedCols); + } + + @Override + protected OrcStruct createStructObject(Object previous, List<TypeInfo> childrenTypes) { + int numChildren = childrenTypes.size(); + if (!(previous instanceof OrcStruct)) { + return new OrcStruct(numChildren); + } + OrcStruct result = (OrcStruct) previous; + result.setNumFields(numChildren); + return result; + } + + @Override + protected OrcUnion createUnionObject(List<TypeInfo> childrenTypes, Object previous) { + return (previous instanceof OrcUnion) ? (OrcUnion)previous : new OrcUnion(); + } + + @Override + protected void setStructCol(OrcStruct structObj, int i, Object value) { + structObj.setFieldValue(i, value); + } + + @Override + protected Object getStructCol(OrcStruct structObj, int i) { + return structObj.getFieldValue(i); + } + + @Override + protected Object getUnionField(OrcUnion unionObj) { + return unionObj.getObject(); + } + + @Override + protected void setUnion(OrcUnion unionObj, byte tag, Object object) { + unionObj.set(tag, object); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 7595065..f555741 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -907,19 +907,19 @@ public final class GenMapRedUtils { } } else if (task instanceof ExecDriver) { MapredWork work = (MapredWork) task.getWork(); - work.getMapWork().deriveLlap(conf); + work.getMapWork().deriveLlap(conf, true); } else if (task != null && (task.getWork() instanceof TezWork)) { TezWork work = (TezWork)task.getWork(); for (BaseWork w : work.getAllWorkUnsorted()) { if (w instanceof MapWork) { - ((MapWork)w).deriveLlap(conf); + ((MapWork)w).deriveLlap(conf, false); } } } else if (task instanceof SparkTask) { SparkWork work = (SparkWork) task.getWork(); for (BaseWork w : work.getAllWorkUnsorted()) { if (w instanceof MapWork) { - ((MapWork) w).deriveLlap(conf); + ((MapWork) w).deriveLlap(conf, false); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index f034812..d908d48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.ql.exec.Utilities; + import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -209,16 +211,26 @@ public class MapWork extends BaseWork { } } - public void deriveLlap(Configuration conf) { + public void deriveLlap(Configuration conf, boolean isExecDriver) { boolean hasLlap = false, hasNonLlap = false, hasAcid = false; // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here. - boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode), - canWrapAny = isLlapOn && HiveInputFormat.canWrapAnyForLlap(conf, this); + boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode); + boolean canWrapAny = false, doCheckIfs = false; + if (isLlapOn) { + // We can wrap inputs if the execution is vectorized, or if we use a wrapper. + canWrapAny = Utilities.getUseVectorizedInputFileFormat(conf, this); + // ExecDriver has no plan path, so we cannot derive VRB stuff for the wrapper. + if (!canWrapAny && !isExecDriver) { + canWrapAny = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED); + doCheckIfs = true; + } + } boolean hasPathToPartInfo = (pathToPartitionInfo != null && !pathToPartitionInfo.isEmpty()); if (canWrapAny && hasPathToPartInfo) { assert isLlapOn; for (PartitionDesc part : pathToPartitionInfo.values()) { - boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap(part.getInputFileFormatClass()); + boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap( + part.getInputFileFormatClass(), doCheckIfs); if (isUsingLlapIo) { if (part.getTableDesc() != null && AcidUtils.isTablePropertyTransactional(part.getTableDesc().getProperties())) { http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/queries/clientpositive/orc_llap_nonvector.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_llap_nonvector.q b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q new file mode 100644 index 0000000..1fc60b2 --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_llap_nonvector.q @@ -0,0 +1,42 @@ +set hive.mapred.mode=nonstrict; +SET hive.vectorized.execution.enabled=true; + +SET hive.llap.io.enabled=false; + +SET hive.exec.orc.default.buffer.size=32768; +SET hive.exec.orc.default.row.index.stride=1000; +SET hive.optimize.index.filter=true; +set hive.auto.convert.join=false; +set hive.fetch.task.conversion=none; + +DROP TABLE orc_create_staging; +DROP TABLE orc_create_complex; +DROP TABLE orc_llap_nonvector; + + +CREATE TABLE orc_create_staging ( + str STRING, + mp MAP<STRING,STRING>, + lst ARRAY<STRING>, + strct STRUCT<A:STRING,B:STRING> +) ROW FORMAT DELIMITED + FIELDS TERMINATED BY '|' + COLLECTION ITEMS TERMINATED BY ',' + MAP KEYS TERMINATED BY ':'; +LOAD DATA LOCAL INPATH '../../data/files/orc_create.txt' OVERWRITE INTO TABLE orc_create_staging; + +create table orc_llap_nonvector stored as orc as select *, rand(1234) rdm from alltypesorc order by rdm; + +SET hive.llap.io.enabled=true; +set hive.auto.convert.join=true; +SET hive.vectorized.execution.enabled=false; + +explain +select * from orc_llap_nonvector limit 100; +select * from orc_llap_nonvector limit 100; +explain +select cint, cstring1 from orc_llap_nonvector limit 1025; +select cint, cstring1 from orc_llap_nonvector limit 1025; + +DROP TABLE orc_create_staging; +DROP TABLE orc_llap_nonvector; http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out index 912cd7f..e75bec6 100644 --- a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out +++ b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out @@ -3169,7 +3169,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: unknown Map 5 Map Operator Tree: TableScan @@ -5557,7 +5557,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: unknown Map 2 Map Operator Tree: TableScan @@ -5940,7 +5940,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out b/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out index ad3a358..3e96268 100644 --- a/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out +++ b/ql/src/test/results/clientpositive/llap/hybridgrace_hashjoin_1.q.out @@ -83,7 +83,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -102,7 +102,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -208,7 +208,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -227,7 +227,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -329,7 +329,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -348,7 +348,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -450,7 +450,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -469,7 +469,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -566,7 +566,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -582,7 +582,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -679,7 +679,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -695,7 +695,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/llap_udf.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/llap_udf.q.out b/ql/src/test/results/clientpositive/llap/llap_udf.q.out index e517942..53801d8 100644 --- a/ql/src/test/results/clientpositive/llap/llap_udf.q.out +++ b/ql/src/test/results/clientpositive/llap/llap_udf.q.out @@ -116,7 +116,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator @@ -206,7 +206,7 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/llapdecider.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/llapdecider.q.out b/ql/src/test/results/clientpositive/llap/llapdecider.q.out index 27ab92a..3ebaafb 100644 --- a/ql/src/test/results/clientpositive/llap/llapdecider.q.out +++ b/ql/src/test/results/clientpositive/llap/llapdecider.q.out @@ -193,7 +193,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -276,7 +276,7 @@ STAGE PLANS: Statistics: Num rows: 205 Data size: 19475 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -348,7 +348,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -438,7 +438,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -458,7 +458,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -532,7 +532,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -800,7 +800,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -820,7 +820,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Reduce Operator Tree: Merge Join Operator @@ -980,7 +980,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 4 Map Operator Tree: TableScan @@ -1000,7 +1000,7 @@ STAGE PLANS: Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col1 (type: string) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1082,7 +1082,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: @@ -1259,7 +1259,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: bigint) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree: http://git-wip-us.apache.org/repos/asf/hive/blob/b772fed0/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out b/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out index 2d1b818..01f1f80 100644 --- a/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out +++ b/ql/src/test/results/clientpositive/llap/mapjoin_decimal.q.out @@ -123,7 +123,7 @@ STAGE PLANS: Statistics: Num rows: 1153 Data size: 129236 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: decimal(4,0)) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Map 3 Map Operator Tree: TableScan @@ -142,7 +142,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: decimal(6,2)) Statistics: Num rows: 1049 Data size: 117488 Basic stats: COMPLETE Column stats: NONE Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Reducer 2 Execution mode: llap Reduce Operator Tree:
