http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java deleted file mode 100644 index faec0aa..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRowSameBatch.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import org.apache.hadoop.hive.ql.metadata.HiveException; - -/** - * This class extracts specified VectorizedRowBatch row columns into a Writable row Object[]. - * - * The caller provides the hive type names and target column numbers in the order desired to - * extract from the Writable row Object[]. - * - * This class is for use when the batch being assigned is always the same. - */ -public class VectorExtractRowSameBatch extends VectorExtractRow { - - public void setOneBatch(VectorizedRowBatch batch) throws HiveException { - setBatch(batch); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index a3082c3..ff88b85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -43,7 +43,7 @@ public class VectorFileSinkOperator extends FileSinkOperator { private transient boolean firstBatch; - private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + private transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -80,30 +80,26 @@ public class VectorFileSinkOperator extends FileSinkOperator { public void process(Object data, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) data; if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - vectorExtractRowDynBatch.setBatchOnEntry(batch); - if (batch.selectedInUse) { int selected[] = batch.selected; for (int logical = 0 ; logical < batch.size; logical++) { int batchIndex = selected[logical]; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } else { for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } - - vectorExtractRowDynBatch.forgetBatchOnExit(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index f20f614..98a9bf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -103,7 +103,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements private transient VectorizedRowBatch outputBatch; private transient VectorizedRowBatchCtx vrbCtx; - private transient VectorAssignRowSameBatch vectorAssignRowSameBatch; + private transient VectorAssignRow vectorAssignRow; private transient int numEntriesHashTable; @@ -823,9 +823,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements vrbCtx = new VectorizedRowBatchCtx(); vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); - vectorAssignRowSameBatch = new VectorAssignRowSameBatch(); - vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns()); - vectorAssignRowSameBatch.setOneBatch(outputBatch); + vectorAssignRow = new VectorAssignRow(); + vectorAssignRow.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns()); } } catch (HiveException he) { @@ -912,11 +911,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } else { // Output keys and aggregates into the output batch. for (int i = 0; i < outputKeyLength; ++i) { - vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++, + vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++, keyWrappersBatch.getWritableKeyValue (kw, i, keyOutputWriters[i])); } for (int i = 0; i < aggregators.length; ++i) { - vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++, + vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++, aggregators[i].evaluateOutput(agg.getAggregationBuffer(i))); } ++outputBatch.size; @@ -937,7 +936,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements throws HiveException { int fi = outputKeyLength; // Start after group keys. for (int i = 0; i < aggregators.length; ++i) { - vectorAssignRowSameBatch.assignRowColumn(outputBatch.size, fi++, + vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++, aggregators[i].evaluateOutput(agg.getAggregationBuffer(i))); } ++outputBatch.size; http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index 6bed52f..902a183 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -59,7 +59,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector protected transient VectorizedRowBatch outputBatch; protected transient VectorizedRowBatch scratchBatch; // holds restored (from disk) big table rows - protected transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap; + protected transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap; protected transient VectorizedRowBatchCtx vrbCtx = null; @@ -100,7 +100,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector outputBatch = vrbCtx.createVectorizedRowBatch(); - outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>(); + outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>(); } /** @@ -109,15 +109,14 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector @Override protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { Object[] values = (Object[]) row; - VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI); + VectorAssignRow va = outputVectorAssignRowMap.get(outputOI); if (va == null) { - va = new VectorAssignRowSameBatch(); + va = new VectorAssignRow(); va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns()); - va.setOneBatch(outputBatch); outputVectorAssignRowMap.put(outputOI, va); } - va.assignRow(outputBatch.size, values); + va.assignRow(outputBatch, outputBatch.size, values); ++outputBatch.size; if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index e8f4471..3323df3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -181,7 +181,9 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { joinValues[posBigTable] = vectorNodeEvaluators; // Filtering is handled in the input batch processing - filterMaps[posBigTable] = null; + if (filterMaps != null) { + filterMaps[posBigTable] = null; + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java index 0fe1188..22bebb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java @@ -44,7 +44,7 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato private transient boolean firstBatch; - private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + private transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -94,33 +94,28 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato } if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - - vectorExtractRowDynBatch.setBatchOnEntry(batch); - // VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp "); if (batch.selectedInUse) { int selected[] = batch.selected; for (int logical = 0 ; logical < batch.size; logical++) { int batchIndex = selected[logical]; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } else { for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } - - vectorExtractRowDynBatch.forgetBatchOnExit(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 9f0c24e..6979956 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -18,20 +18,422 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.fast.DeserializeRead; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.Writable; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; -public class VectorMapOperator extends MapOperator { +/* + * + * The vectorized MapOperator. + * + * There are 3 modes of reading for vectorization: + * + * 1) One for the Vectorized Input File Format which returns VectorizedRowBatch as the row. + * + * 2) One for using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch. + * Currently, these Input File Formats: + * TEXTFILE + * SEQUENCEFILE + * + * 3) And one using the regular partition deserializer to get the row object and assigning + * the row object into the VectorizedRowBatch with VectorAssignRow. + * This picks up Input File Format not supported by the other two. + */ +public class VectorMapOperator extends AbstractMapOperator { private static final long serialVersionUID = 1L; + /* + * Overall information on this vectorized Map operation. + */ + private transient HashMap<String, VectorPartitionContext> fileToPartitionContextMap; + + private transient Operator<? extends OperatorDesc> oneRootOperator; + + private transient TypeInfo tableStructTypeInfo; + private transient StandardStructObjectInspector tableStandardStructObjectInspector; + + private transient TypeInfo[] tableRowTypeInfos; + + private transient VectorizedRowBatchCtx batchContext; + // The context for creating the VectorizedRowBatch for this Map node that + // the Vectorizer class determined. + + /* + * A different batch for vectorized Input File Format readers so they can do their work + * overlapped with work of the row collection that vector/row deserialization does. This allows + * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change). + */ + private transient VectorizedRowBatch vectorizedInputFileFormatBatch; + + /* + * This batch is only used by vector/row deserializer readers. + */ + private transient VectorizedRowBatch deserializerBatch; + + private transient long batchCounter; + + private transient int dataColumnCount; + private transient int partitionColumnCount; + private transient Object[] partitionValues; + + private transient boolean[] columnsToIncludeTruncated; + + /* + * The following members have context information for the current partition file being read. + */ + private transient VectorMapOperatorReadType currentReadType; + private transient VectorPartitionContext currentVectorPartContext; + // Current vector map operator read type and context. + + private transient int currentDataColumnCount; + // The number of data columns that the current reader will return. + // Only applicable for vector/row deserialization. + + private transient DeserializeRead currentDeserializeRead; + private transient VectorDeserializeRow currentVectorDeserializeRow; + // When we are doing vector deserialization, these are the fast deserializer and + // the vector row deserializer. + + private Deserializer currentPartDeserializer; + private StructObjectInspector currentPartRawRowObjectInspector; + private VectorAssignRow currentVectorAssign; + // When we are doing row deserialization, these are the regular deserializer, + // partition object inspector, and vector row assigner. + + /* + * The abstract context for the 3 kinds of vectorized reading. + */ + protected abstract class VectorPartitionContext { + + protected final PartitionDesc partDesc; + + String tableName; + String partName; + + /* + * Initialization here is adapted from MapOperator.MapOpCtx.initObjectInspector method. + */ + private VectorPartitionContext(PartitionDesc partDesc) { + this.partDesc = partDesc; + + TableDesc td = partDesc.getTableDesc(); + + // Use table properties in case of unpartitioned tables, + // and the union of table properties and partition properties, with partition + // taking precedence, in the case of partitioned tables + Properties overlayedProps = + SerDeUtils.createOverlayedProperties(td.getProperties(), partDesc.getProperties()); + + Map<String, String> partSpec = partDesc.getPartSpec(); + + tableName = String.valueOf(overlayedProps.getProperty("name")); + partName = String.valueOf(partSpec); + + } + + public PartitionDesc getPartDesc() { + return partDesc; + } + + /* + * Override this for concrete initialization. + */ + public abstract void init(Configuration hconf) + throws SerDeException, Exception; + + /* + * How many data columns is the partition reader actually supplying? + */ + public abstract int getReaderDataColumnCount(); + } + + /* + * Context for reading a Vectorized Input File Format. + */ + protected class VectorizedInputFileFormatPartitionContext extends VectorPartitionContext { + + private VectorizedInputFileFormatPartitionContext(PartitionDesc partDesc) { + super(partDesc); + } + + public void init(Configuration hconf) { + } + + @Override + public int getReaderDataColumnCount() { + throw new RuntimeException("Not applicable"); + } + } + + /* + * Context for using VectorDeserializeRow to deserialize each row from the Input File Format + * into the VectorizedRowBatch. + */ + protected class VectorDeserializePartitionContext extends VectorPartitionContext { + + // This helper object deserializes known deserialization / input file format combination into + // columns of a row in a vectorized row batch. + private VectorDeserializeRow vectorDeserializeRow; + + private DeserializeRead deserializeRead; + + private int readerColumnCount; + + private VectorDeserializePartitionContext(PartitionDesc partDesc) { + super(partDesc); + } + + public VectorDeserializeRow getVectorDeserializeRow() { + return vectorDeserializeRow; + } + + DeserializeRead getDeserializeRead() { + return deserializeRead; + } + + @Override + public int getReaderDataColumnCount() { + return readerColumnCount; + } + + public void init(Configuration hconf) + throws SerDeException, HiveException { + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + + // This type information specifies the data types the partition needs to read. + TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos(); + + switch (vectorPartDesc.getVectorDeserializeType()) { + case LAZY_SIMPLE: + { + LazySerDeParameters simpleSerdeParams = + new LazySerDeParameters(hconf, partDesc.getTableDesc().getProperties(), + LazySimpleSerDe.class.getName()); + + LazySimpleDeserializeRead lazySimpleDeserializeRead = + new LazySimpleDeserializeRead(dataTypeInfos, simpleSerdeParams); + + vectorDeserializeRow = + new VectorDeserializeRow<LazySimpleDeserializeRead>(lazySimpleDeserializeRead); + + // Initialize with data row type conversion parameters. + readerColumnCount = + vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated); + + deserializeRead = lazySimpleDeserializeRead; + } + break; + + case LAZY_BINARY: + { + LazyBinaryDeserializeRead lazyBinaryDeserializeRead = + new LazyBinaryDeserializeRead(dataTypeInfos); + + vectorDeserializeRow = + new VectorDeserializeRow<LazyBinaryDeserializeRead>(lazyBinaryDeserializeRead); + + // Initialize with data row type conversion parameters. + readerColumnCount = + vectorDeserializeRow.initConversion(tableRowTypeInfos, columnsToIncludeTruncated); + + deserializeRead = lazyBinaryDeserializeRead; + } + break; + + default: + throw new RuntimeException( + "Unexpected vector deserialize row type " + vectorPartDesc.getVectorDeserializeType().name()); + } + } + } + + /* + * Context for reading using the regular partition deserializer to get the row object and + * assigning the row object into the VectorizedRowBatch with VectorAssignRow + */ + protected class RowDeserializePartitionContext extends VectorPartitionContext { + + private Deserializer partDeserializer; + private StructObjectInspector partRawRowObjectInspector; + private VectorAssignRow vectorAssign; + + private int readerColumnCount; + + private RowDeserializePartitionContext(PartitionDesc partDesc) { + super(partDesc); + } + + public Deserializer getPartDeserializer() { + return partDeserializer; + } + + public StructObjectInspector getPartRawRowObjectInspector() { + return partRawRowObjectInspector; + } + + public VectorAssignRow getVectorAssign() { + return vectorAssign; + } + + @Override + public int getReaderDataColumnCount() { + return readerColumnCount; + } + + public void init(Configuration hconf) + throws Exception { + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + + partDeserializer = partDesc.getDeserializer(hconf); + + if (partDeserializer instanceof OrcSerde) { + + // UNDONE: We need to get the table schema inspector from self-describing Input File + // Formats like ORC. Modify the ORC serde instead? For now, this works. + + partRawRowObjectInspector = + (StructObjectInspector) OrcStruct.createObjectInspector(tableStructTypeInfo); + + } else { + partRawRowObjectInspector = + (StructObjectInspector) partDeserializer.getObjectInspector(); + } + + TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos(); + + vectorAssign = new VectorAssignRow(); + + // Initialize with data type conversion parameters. + readerColumnCount = + vectorAssign.initConversion(dataTypeInfos, tableRowTypeInfos, columnsToIncludeTruncated); + } + } + + public VectorPartitionContext createAndInitPartitionContext(PartitionDesc partDesc, + Configuration hconf) + throws SerDeException, Exception { + + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + VectorPartitionContext vectorPartitionContext; + VectorMapOperatorReadType vectorMapOperatorReadType = + vectorPartDesc.getVectorMapOperatorReadType(); + + if (vectorMapOperatorReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE || + vectorMapOperatorReadType == VectorMapOperatorReadType.ROW_DESERIALIZE) { + // Verify hive.exec.schema.evolution is true or we have an ACID table so we are producing + // the table schema from ORC. The Vectorizer class assures this. + boolean isAcid = + AcidUtils.isTablePropertyTransactional(partDesc.getTableDesc().getProperties()); + Preconditions.checkState(Utilities.isSchemaEvolutionEnabled(hconf, isAcid)); + } + + switch (vectorMapOperatorReadType) { + case VECTORIZED_INPUT_FILE_FORMAT: + vectorPartitionContext = new VectorizedInputFileFormatPartitionContext(partDesc); + break; + + case VECTOR_DESERIALIZE: + vectorPartitionContext = new VectorDeserializePartitionContext(partDesc); + break; + + case ROW_DESERIALIZE: + vectorPartitionContext = new RowDeserializePartitionContext(partDesc); + break; + + default: + throw new RuntimeException("Unexpected vector MapOperator read type " + + vectorMapOperatorReadType.name()); + } + + vectorPartitionContext.init(hconf); + + return vectorPartitionContext; + } + + private void determineColumnsToInclude(Configuration hconf) { + + columnsToIncludeTruncated = null; + + List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(hconf); + if (columnsToIncludeTruncatedList != null && + columnsToIncludeTruncatedList.size() > 0 && columnsToIncludeTruncatedList.size() < dataColumnCount ) { + + // Partitioned columns will not be in the include list. + + boolean[] columnsToInclude = new boolean[dataColumnCount]; + Arrays.fill(columnsToInclude, false); + for (int columnNum : columnsToIncludeTruncatedList) { + columnsToInclude[columnNum] = true; + } + + // Work backwards to find the highest wanted column. + + int highestWantedColumnNum = -1; + for (int i = dataColumnCount - 1; i >= 0; i--) { + if (columnsToInclude[i]) { + highestWantedColumnNum = i; + break; + } + } + if (highestWantedColumnNum == -1) { + throw new RuntimeException("No columns to include?"); + } + int newColumnCount = highestWantedColumnNum + 1; + if (newColumnCount == dataColumnCount) { + columnsToIncludeTruncated = columnsToInclude; + } else { + columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount); + } + } + } + /** Kryo ctor. */ - @VisibleForTesting public VectorMapOperator() { super(); } @@ -40,29 +442,445 @@ public class VectorMapOperator extends MapOperator { super(ctx); } + + /* + * This is the same as the setChildren method below but for empty tables. + */ + @Override + public void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf) + throws SerDeException, Exception { + + // Get the single TableScanOperator. Vectorization only supports one input tree. + Preconditions.checkState(children.size() == 1); + oneRootOperator = children.get(0); + + internalSetChildren(hconf); + } + + @Override + public void setChildren(Configuration hconf) throws Exception { + + // Get the single TableScanOperator. Vectorization only supports one input tree. + Iterator<Operator<? extends OperatorDesc>> aliasToWorkIterator = + conf.getAliasToWork().values().iterator(); + oneRootOperator = aliasToWorkIterator.next(); + Preconditions.checkState(!aliasToWorkIterator.hasNext()); + + internalSetChildren(hconf); + } + + /* + * Create information for vector map operator. + * The member oneRootOperator has been set. + */ + private void internalSetChildren(Configuration hconf) throws Exception { + + // The setupPartitionContextVars uses the prior read type to flush the prior deserializerBatch, + // so set it here to none. + currentReadType = VectorMapOperatorReadType.NONE; + + determineColumnsToInclude(hconf); + + batchContext = conf.getVectorizedRowBatchCtx(); + + /* + * Use a different batch for vectorized Input File Format readers so they can do their work + * overlapped with work of the row collection that vector/row deserialization does. This allows + * the partitions to mix modes (e.g. for us to flush the previously batched rows on file change). + */ + vectorizedInputFileFormatBatch = + batchContext.createVectorizedRowBatch(columnsToIncludeTruncated); + conf.setVectorizedRowBatch(vectorizedInputFileFormatBatch); + + /* + * This batch is used by vector/row deserializer readers. + */ + deserializerBatch = batchContext.createVectorizedRowBatch(columnsToIncludeTruncated); + + batchCounter = 0; + + dataColumnCount = batchContext.getDataColumnCount(); + partitionColumnCount = batchContext.getPartitionColumnCount(); + partitionValues = new Object[partitionColumnCount]; + + /* + * Create table related objects + */ + tableStructTypeInfo = + TypeInfoFactory.getStructTypeInfo( + Arrays.asList(batchContext.getRowColumnNames()), + Arrays.asList(batchContext.getRowColumnTypeInfos())); + tableStandardStructObjectInspector = + (StandardStructObjectInspector) + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(tableStructTypeInfo); + + tableRowTypeInfos = batchContext.getRowColumnTypeInfos(); + + /* + * The Vectorizer class enforces that there is only one TableScanOperator, so + * we don't need the more complicated multiple root operator mapping that MapOperator has. + */ + fileToPartitionContextMap = new HashMap<String, VectorPartitionContext>(); + + // Temporary map so we only create one partition context entry. + HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap = + new HashMap<PartitionDesc, VectorPartitionContext>(); + + for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) { + String path = entry.getKey(); + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(path); + ArrayList<String> aliases = entry.getValue(); + + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + LOG.info("VectorMapOperator path: " + path + + ", read type " + vectorPartDesc.getVectorMapOperatorReadType().name() + + ", vector deserialize type " + vectorPartDesc.getVectorDeserializeType().name() + + ", aliases " + aliases); + + VectorPartitionContext vectorPartitionContext; + if (!partitionContextMap.containsKey(partDesc)) { + vectorPartitionContext = createAndInitPartitionContext(partDesc, hconf); + partitionContextMap.put(partDesc, vectorPartitionContext); + } else { + vectorPartitionContext = partitionContextMap.get(partDesc); + } + + fileToPartitionContextMap.put(path, vectorPartitionContext); + } + + // Create list of one. + List<Operator<? extends OperatorDesc>> children = + new ArrayList<Operator<? extends OperatorDesc>>(); + children.add(oneRootOperator); + + setChildOperators(children); + } + + @Override + public void initializeMapOperator(Configuration hconf) throws HiveException { + super.initializeMapOperator(hconf); + + oneRootOperator.initialize(hconf, new ObjectInspector[] {tableStandardStructObjectInspector}); + } + + public void initializeContexts() throws HiveException { + Path fpath = getExecContext().getCurrentInputPath(); + String nominalPath = getNominalPath(fpath); + setupPartitionContextVars(nominalPath); + } + + // Find context for current input file + @Override + public void cleanUpInputFileChangedOp() throws HiveException { + super.cleanUpInputFileChangedOp(); + Path fpath = getExecContext().getCurrentInputPath(); + String nominalPath = getNominalPath(fpath); + + setupPartitionContextVars(nominalPath); + + // Add alias, table name, and partitions to hadoop conf so that their + // children will inherit these + oneRootOperator.setInputContext(currentVectorPartContext.tableName, + currentVectorPartContext.partName); + } + + /* + * Setup the context for reading from the next partition file. + */ + private void setupPartitionContextVars(String nominalPath) throws HiveException { + + currentVectorPartContext = fileToPartitionContextMap.get(nominalPath); + PartitionDesc partDesc = currentVectorPartContext.getPartDesc(); + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + currentReadType = vectorPartDesc.getVectorMapOperatorReadType(); + + /* + * Setup for 3 different kinds of vectorized reading supported: + * + * 1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row. + * + * 2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch. + * + * 3) And read using the regular partition deserializer to get the row object and assigning + * the row object into the VectorizedRowBatch with VectorAssignRow. + */ + if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) { + + /* + * The Vectorized Input File Format reader is responsible for setting the partition column + * values, resetting and filling in the batch, etc. + */ + + /* + * Clear all the reading variables. + */ + currentDataColumnCount = 0; + + currentDeserializeRead = null; + currentVectorDeserializeRow = null; + + currentPartDeserializer = null; + currentPartRawRowObjectInspector = null; + currentVectorAssign = null; + + } else { + + /* + * We will get "regular" single rows from the Input File Format reader that we will need + * to {vector|row} deserialize. + */ + Preconditions.checkState( + currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE || + currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE); + + if (deserializerBatch.size > 0) { + + /* + * Clear out any rows in the batch from previous partition since we are going to change + * the repeating partition column values. + */ + batchCounter++; + oneRootOperator.process(deserializerBatch, 0); + deserializerBatch.reset(); + if (oneRootOperator.getDone()) { + setDone(true); + return; + } + + } + + /* + * For this particular file, how many columns will we actually read? + */ + currentDataColumnCount = currentVectorPartContext.getReaderDataColumnCount(); + + if (currentDataColumnCount < dataColumnCount) { + + /* + * Default any additional data columns to NULL once for the file. + */ + for (int i = currentDataColumnCount; i < dataColumnCount; i++) { + ColumnVector colVector = deserializerBatch.cols[i]; + colVector.isNull[0] = true; + colVector.noNulls = false; + colVector.isRepeating = true; + } + } + + if (batchContext.getPartitionColumnCount() > 0) { + + /* + * The partition columns are set once for the partition and are marked repeating. + */ + VectorizedRowBatchCtx.getPartitionValues(batchContext, partDesc, partitionValues); + batchContext.addPartitionColsToBatch(deserializerBatch, partitionValues); + } + + /* + * Set or clear the rest of the reading variables based on {vector|row} deserialization. + */ + switch (currentReadType) { + case VECTOR_DESERIALIZE: + { + VectorDeserializePartitionContext vectorDeserPartContext = + (VectorDeserializePartitionContext) currentVectorPartContext; + + // Set ours. + currentDeserializeRead = vectorDeserPartContext.getDeserializeRead(); + currentVectorDeserializeRow = vectorDeserPartContext.getVectorDeserializeRow(); + + // Clear the other ones. + currentPartDeserializer = null; + currentPartRawRowObjectInspector = null; + currentVectorAssign = null; + + } + break; + + case ROW_DESERIALIZE: + { + RowDeserializePartitionContext rowDeserPartContext = + (RowDeserializePartitionContext) currentVectorPartContext; + + // Clear the other ones. + currentDeserializeRead = null; + currentVectorDeserializeRow = null; + + // Set ours. + currentPartDeserializer = rowDeserPartContext.getPartDeserializer(); + currentPartRawRowObjectInspector = rowDeserPartContext.getPartRawRowObjectInspector(); + currentVectorAssign = rowDeserPartContext.getVectorAssign(); + } + break; + + default: + throw new RuntimeException("Unexpected VectorMapOperator read type " + + currentReadType.name()); + } + } + } + + @Override + public Deserializer getCurrentDeserializer() { + // Not applicable. + return null; + } + @Override public void process(Writable value) throws HiveException { + // A mapper can span multiple files/partitions. - // The serializers need to be reset if the input file changed + // The VectorPartitionContext need to be changed if the input file changed ExecMapperContext context = getExecContext(); if (context != null && context.inputFileChanged()) { // The child operators cleanup if input file has changed cleanUpInputFileChanged(); } - // The row has been converted to comply with table schema, irrespective of partition schema. - // So, use tblOI (and not partOI) for forwarding - try { - int childrenDone = 0; - for (MapOpCtx current : currentCtxs) { - if (!current.forward(value)) { - childrenDone++; + if (!oneRootOperator.getDone()) { + + /* + * 3 different kinds of vectorized reading supported: + * + * 1) Read the Vectorized Input File Format which returns VectorizedRowBatch as the row. + * + * 2) Read using VectorDeserializeRow to deserialize each row into the VectorizedRowBatch. + * + * 3) And read using the regular partition deserializer to get the row object and assigning + * the row object into the VectorizedRowBatch with VectorAssignRow. + */ + try { + if (currentReadType == VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) { + + /* + * The Vectorized Input File Format reader has already set the partition column + * values, reset and filled in the batch, etc. + * + * We pass the VectorizedRowBatch through here. + */ + batchCounter++; + oneRootOperator.process(value, 0); + if (oneRootOperator.getDone()) { + setDone(true); + return; + } + + } else { + + /* + * We have a "regular" single rows from the Input File Format reader that we will need + * to deserialize. + */ + Preconditions.checkState( + currentReadType == VectorMapOperatorReadType.VECTOR_DESERIALIZE || + currentReadType == VectorMapOperatorReadType.ROW_DESERIALIZE); + + if (deserializerBatch.size == deserializerBatch.DEFAULT_SIZE) { + + /* + * Feed current full batch to operator tree. + */ + batchCounter++; + oneRootOperator.process(deserializerBatch, 0); + + /** + * Only reset the current data columns. Not any data columns defaulted to NULL + * because they are not present in the partition, and not partition columns. + */ + for (int c = 0; c < currentDataColumnCount; c++) { + deserializerBatch.cols[c].reset(); + deserializerBatch.cols[c].init(); + } + deserializerBatch.selectedInUse = false; + deserializerBatch.size = 0; + deserializerBatch.endOfFile = false; + + if (oneRootOperator.getDone()) { + setDone(true); + return; + } + } + + /* + * Do the {vector|row} deserialization of the one row into the VectorizedRowBatch. + */ + switch (currentReadType) { + case VECTOR_DESERIALIZE: + { + BinaryComparable binComp = (BinaryComparable) value; + currentDeserializeRead.set(binComp.getBytes(), 0, binComp.getLength()); + + // Deserialize and append new row using the current batch size as the index. + currentVectorDeserializeRow.deserialize(deserializerBatch, deserializerBatch.size++); + } + break; + + case ROW_DESERIALIZE: + { + Object deserialized = currentPartDeserializer.deserialize(value); + + // Note: Regardless of what the Input File Format returns, we have determined + // with VectorAppendRow.initConversion that only currentDataColumnCount columns + // have values we want. + // + // Any extra columns needed by the table schema were set to repeating null + // in the batch by setupPartitionContextVars. + + // Convert input row to standard objects. + List<Object> standardObjects = new ArrayList<Object>(); + ObjectInspectorUtils.copyToStandardObject(standardObjects, deserialized, + currentPartRawRowObjectInspector, ObjectInspectorCopyOption.WRITABLE); + if (standardObjects.size() < currentDataColumnCount) { + throw new HiveException("Input File Format returned row with too few columns"); + } + + // Append the deserialized standard object row using the current batch size + // as the index. + currentVectorAssign.assignRow(deserializerBatch, deserializerBatch.size++, + standardObjects, currentDataColumnCount); + } + break; + + default: + throw new RuntimeException("Unexpected vector MapOperator read type " + + currentReadType.name()); + } } + } catch (Exception e) { + throw new HiveException("Hive Runtime Error while processing row ", e); } + } + } + + @Override + public void process(Object row, int tag) throws HiveException { + throw new HiveException("Hive 2 Internal error: should not be called!"); + } - rowsForwarded(childrenDone, ((VectorizedRowBatch)value).size); - } catch (Exception e) { - throw new HiveException("Hive Runtime Error while processing row ", e); + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort && oneRootOperator != null && !oneRootOperator.getDone() && + currentReadType != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) { + if (deserializerBatch.size > 0) { + batchCounter++; + oneRootOperator.process(deserializerBatch, 0); + deserializerBatch.size = 0; + } } + super.closeOp(abort); + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MAP"; + } + + @Override + public OperatorType getType() { + return null; } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index 74e5130..dd5e20f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -40,7 +40,7 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator { private transient boolean firstBatch; - private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + private transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -81,32 +81,28 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator { VectorizedRowBatch batch = (VectorizedRowBatch) data; if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - vectorExtractRowDynBatch.setBatchOnEntry(batch); - // VectorizedBatchUtil.debugDisplayBatch( batch, "VectorReduceSinkOperator processOp "); if (batch.selectedInUse) { int selected[] = batch.selected; for (int logical = 0 ; logical < batch.size; logical++) { int batchIndex = selected[logical]; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } else { for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } - - vectorExtractRowDynBatch.forgetBatchOnExit(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index 85c8506..59153c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -76,7 +76,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect private transient VectorHashKeyWrapperBatch keyWrapperBatch; - private transient Map<ObjectInspector, VectorAssignRowSameBatch> outputVectorAssignRowMap; + private transient Map<ObjectInspector, VectorAssignRow> outputVectorAssignRowMap; private transient int batchIndex = -1; @@ -159,7 +159,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>(); + outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRow>(); // This key evaluator translates from the vectorized VectorHashKeyWrapper format // into the row-mode MapJoinKey @@ -287,15 +287,14 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect @Override protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { Object[] values = (Object[]) row; - VectorAssignRowSameBatch va = outputVectorAssignRowMap.get(outputOI); + VectorAssignRow va = outputVectorAssignRowMap.get(outputOI); if (va == null) { - va = new VectorAssignRowSameBatch(); + va = new VectorAssignRow(); va.init((StructObjectInspector) outputOI, vOutContext.getProjectedColumns()); - va.setOneBatch(outputBatch); outputVectorAssignRowMap.put(outputOI, va); } - va.assignRow(outputBatch.size, values); + va.assignRow(outputBatch, outputBatch.size, values); ++outputBatch.size; if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java index 3d0b571..51d1436 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java @@ -46,7 +46,7 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator private transient boolean firstBatch; - private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + private transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -82,28 +82,26 @@ public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator VectorizedRowBatch batch = (VectorizedRowBatch) row; if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - vectorExtractRowDynBatch.setBatchOnEntry(batch); + if (batch.selectedInUse) { int selected[] = batch.selected; for (int logical = 0 ; logical < batch.size; logical++) { int batchIndex = selected[logical]; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } else { for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); super.process(singleRow, tag); } } - - vectorExtractRowDynBatch.forgetBatchOnExit(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java index e7ac531..2dc4d0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java @@ -42,7 +42,7 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni protected transient boolean firstBatch; - protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + protected transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -77,27 +77,24 @@ public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruni public void process(Object data, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) data; if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - vectorExtractRowDynBatch.setBatchOnEntry(batch); ObjectInspector rowInspector = inputObjInspectors[0]; try { Writable writableRow; for (int logical = 0; logical < batch.size; logical++) { int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); writableRow = serializer.serialize(singleRow, rowInspector); writableRow.write(buffer); } } catch (Exception e) { throw new HiveException(e); } - - vectorExtractRowDynBatch.forgetBatchOnExit(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 86025ef..5c55011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -68,13 +68,11 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUD import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxTimestamp; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxIntervalDayTime; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinTimestamp; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinIntervalDayTime; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDecimal; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong; @@ -171,10 +169,6 @@ public class VectorizationContext { public VectorizationContext(String contextName, List<String> initialColumnNames) { this.contextName = contextName; level = 0; - if (LOG.isDebugEnabled()) { - LOG.debug("VectorizationContext consructor contextName " + contextName + " level " - + level + " initialColumnNames " + initialColumnNames); - } this.initialColumnNames = initialColumnNames; this.projectionColumnNames = initialColumnNames; @@ -195,9 +189,6 @@ public class VectorizationContext { public VectorizationContext(String contextName) { this.contextName = contextName; level = 0; - if (LOG.isDebugEnabled()) { - LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level); - } initialColumnNames = new ArrayList<String>(); projectedColumns = new ArrayList<Integer>(); projectionColumnNames = new ArrayList<String>(); @@ -213,7 +204,6 @@ public class VectorizationContext { public VectorizationContext(String contextName, VectorizationContext vContext) { this.contextName = contextName; level = vContext.level + 1; - LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level); this.initialColumnNames = vContext.initialColumnNames; this.projectedColumns = new ArrayList<Integer>(); this.projectionColumnNames = new ArrayList<String>(); @@ -485,7 +475,7 @@ public class VectorizationContext { throw new HiveException("Could not vectorize expression: "+exprDesc.getName()); } if (LOG.isDebugEnabled()) { - LOG.debug("Input Expression = " + exprDesc.getTypeInfo() + LOG.debug("Input Expression = " + exprDesc.toString() + ", Vectorized Expression = " + ve.toString()); } return ve; http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index be04da8..9471e66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; +import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; @@ -28,8 +29,11 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -49,12 +53,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -63,6 +69,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -70,6 +77,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.DateUtils; public class VectorizedBatchUtil { @@ -638,6 +646,47 @@ public class VectorizedBatchUtil { return newBatch; } + public static Writable getPrimitiveWritable(PrimitiveCategory primitiveCategory) { + switch (primitiveCategory) { + case VOID: + return null; + case BOOLEAN: + return new BooleanWritable(false); + case BYTE: + return new ByteWritable((byte) 0); + case SHORT: + return new ShortWritable((short) 0); + case INT: + return new IntWritable(0); + case LONG: + return new LongWritable(0); + case TIMESTAMP: + return new TimestampWritable(new Timestamp(0)); + case DATE: + return new DateWritable(new Date(0)); + case FLOAT: + return new FloatWritable(0); + case DOUBLE: + return new DoubleWritable(0); + case BINARY: + return new BytesWritable(ArrayUtils.EMPTY_BYTE_ARRAY); + case STRING: + return new Text(ArrayUtils.EMPTY_BYTE_ARRAY); + case VARCHAR: + return new HiveVarcharWritable(new HiveVarchar(StringUtils.EMPTY, -1)); + case CHAR: + return new HiveCharWritable(new HiveChar(StringUtils.EMPTY, -1)); + case DECIMAL: + return new HiveDecimalWritable(); + case INTERVAL_YEAR_MONTH: + return new HiveIntervalYearMonthWritable(); + case INTERVAL_DAY_TIME: + return new HiveIntervalDayTimeWritable(); + default: + throw new RuntimeException("Primitive category " + primitiveCategory.name() + " not supported"); + } + } + public static String displayBytes(byte[] bytes, int start, int length) { StringBuilder sb = new StringBuilder(); for (int i = start; i < start + length; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 5cbace4..6a3d64b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -186,7 +186,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int length = byteSegmentRef.getLength(); smallTableVectorDeserializeRow.setBytes(bytes, offset, length); - smallTableVectorDeserializeRow.deserializeByValue(batch, batchIndex); + smallTableVectorDeserializeRow.deserialize(batch, batchIndex); } // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, "generateHashMapResultSingleValue big table"); @@ -253,7 +253,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int length = byteSegmentRef.getLength(); smallTableVectorDeserializeRow.setBytes(bytes, offset, length); - smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.size); + smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.size); } // VectorizedBatchUtil.debugDisplayOneRow(overflowBatch, overflowBatch.size, "generateHashMapResultMultiValue overflow"); @@ -304,7 +304,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int length = byteSegmentRef.getLength(); smallTableVectorDeserializeRow.setBytes(bytes, offset, length); - smallTableVectorDeserializeRow.deserializeByValue(overflowBatch, overflowBatch.DEFAULT_SIZE); + smallTableVectorDeserializeRow.deserialize(overflowBatch, overflowBatch.DEFAULT_SIZE); } overflowBatch.size++; @@ -545,7 +545,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC // LOG.debug(CLASS_NAME + " reProcessBigTable serialized row #" + rowCount + ", offset " + offset + ", length " + length); bigTableVectorDeserializeRow.setBytes(bytes, offset, length); - bigTableVectorDeserializeRow.deserializeByValue(spillReplayBatch, spillReplayBatch.size); + bigTableVectorDeserializeRow.deserialize(spillReplayBatch, spillReplayBatch.size); spillReplayBatch.size++; if (spillReplayBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java index 1877f14..d02d1db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java @@ -30,19 +30,19 @@ public class VectorMapJoinFastLongHashUtil { long key = 0; switch (hashTableKeyType) { case BOOLEAN: - key = (keyBinarySortableDeserializeRead.readBoolean() ? 1 : 0); + key = (keyBinarySortableDeserializeRead.currentBoolean ? 1 : 0); break; case BYTE: - key = (long) keyBinarySortableDeserializeRead.readByte(); + key = (long) keyBinarySortableDeserializeRead.currentByte; break; case SHORT: - key = (long) keyBinarySortableDeserializeRead.readShort(); + key = (long) keyBinarySortableDeserializeRead.currentShort; break; case INT: - key = (long) keyBinarySortableDeserializeRead.readInt(); + key = (long) keyBinarySortableDeserializeRead.currentInt; break; case LONG: - key = keyBinarySortableDeserializeRead.readLong(); + key = keyBinarySortableDeserializeRead.currentLong; break; default: throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name()); http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java index adb8044..985fb1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead; -import org.apache.hadoop.hive.serde2.fast.DeserializeRead.ReadStringResults; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BytesWritable; @@ -40,8 +39,6 @@ public class VectorMapJoinFastStringCommon { private BinarySortableDeserializeRead keyBinarySortableDeserializeRead; - private ReadStringResults readStringResults; - public void adaptPutRow(VectorMapJoinFastBytesHashTable hashTable, BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException { @@ -51,9 +48,11 @@ public class VectorMapJoinFastStringCommon { if (keyBinarySortableDeserializeRead.readCheckNull()) { return; } - keyBinarySortableDeserializeRead.readString(readStringResults); - hashTable.add(readStringResults.bytes, readStringResults.start, readStringResults.length, + hashTable.add( + keyBinarySortableDeserializeRead.currentBytes, + keyBinarySortableDeserializeRead.currentBytesStart, + keyBinarySortableDeserializeRead.currentBytesLength, currentValue); } @@ -61,6 +60,5 @@ public class VectorMapJoinFastStringCommon { this.isOuterJoin = isOuterJoin; PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo }; keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos); - readStringResults = keyBinarySortableDeserializeRead.createReadStringResults(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 46a5413..cfedf35 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -205,8 +205,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) { return inputFormat; // LLAP not enabled, no-op. } - boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface, - isVectorized = Utilities.isVectorMode(conf); + boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface; + boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf); if (!isSupported || !isVectorized) { LOG.info("Not using llap for " + inputFormat + ": supported = " + isSupported + ", vectorized = " + isVectorized); @@ -225,7 +225,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } public static boolean canWrapAnyForLlap(Configuration conf, MapWork mapWork) { - return Utilities.isVectorMode(conf, mapWork); + return Utilities.getUseVectorizedInputFileFormat(conf, mapWork); } public static boolean canWrapForLlap(Class<? extends InputFormat> inputFormatClass) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java index c53d149..80858a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/NullRowsInputFormat.java @@ -72,7 +72,7 @@ public class NullRowsInputFormat implements InputFormat<NullWritable, NullWritab private boolean addPartitionCols = true; public NullRowsRecordReader(Configuration conf, InputSplit split) throws IOException { - boolean isVectorMode = Utilities.isVectorMode(conf); + boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(conf); if (LOG.isDebugEnabled()) { LOG.debug("Null record reader in " + (isVectorMode ? "" : "non-") + "vector mode"); } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fcb8ca4..33fe3b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -452,7 +452,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, List<FileStatus> files ) throws IOException { - if (Utilities.isVectorMode(conf)) { + if (Utilities.getUseVectorizedInputFileFormat(conf)) { return new VectorizedOrcInputFormat().validateInput(fs, conf, files); } @@ -1640,7 +1640,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { - boolean vectorMode = Utilities.isVectorMode(conf); + boolean vectorMode = Utilities.getUseVectorizedInputFileFormat(conf); boolean isAcidRead = isAcidRead(conf, inputSplit); if (!isAcidRead) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index a4e35cb..5b65e5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -59,7 +59,7 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra final org.apache.hadoop.mapred.Reporter reporter ) throws IOException { try { - if (Utilities.isVectorMode(job)) { + if (Utilities.getUseVectorizedInputFileFormat(job)) { if (LOG.isDebugEnabled()) { LOG.debug("Using vectorized record reader"); }
