Repository: incubator-hawq Updated Branches: refs/heads/master 339806f3a -> 29a160839
HAWQ-1446: Introduce vectorized profile for ORC. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/29a16083 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/29a16083 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/29a16083 Branch: refs/heads/master Commit: 29a160839949a1a08244962c5255933f714af46c Parents: 339806f Author: Oleksandr Diachenko <[email protected]> Authored: Wed Jun 21 19:58:08 2017 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Wed Jun 21 19:58:08 2017 -0700 ---------------------------------------------------------------------- .../hawq/pxf/api/ReadVectorizedResolver.java | 39 ++ .../org/apache/hawq/pxf/api/StatsAccessor.java | 2 +- .../hawq/pxf/api/utilities/Utilities.java | 22 +- .../hawq/pxf/api/utilities/UtilitiesTest.java | 29 ++ .../pxf/plugins/hive/HiveDataFragmenter.java | 5 +- .../plugins/hive/HiveORCVectorizedAccessor.java | 106 ++++++ .../plugins/hive/HiveORCVectorizedResolver.java | 367 +++++++++++++++++++ .../plugins/hive/utilities/ProfileFactory.java | 15 +- .../hawq/pxf/service/BridgeOutputBuilder.java | 13 + .../org/apache/hawq/pxf/service/ReadBridge.java | 2 +- .../hawq/pxf/service/ReadVectorizedBridge.java | 102 ++++++ .../hawq/pxf/service/rest/BridgeResource.java | 3 + .../src/main/resources/pxf-profiles-default.xml | 11 + 13 files changed, 707 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java new file mode 100644 index 0000000..55f8df5 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java @@ -0,0 +1,39 @@ +package org.apache.hawq.pxf.api; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; + +/** + * + * Interface that defines deserialization batch of records at once. + * + */ +public interface ReadVectorizedResolver { + + /** + * Returns resolved list of tuples + * + * @param batch unresolved batch + * @return list of tuples + */ + public List<List<OneField>> getFieldsForBatch(OneRow batch); + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java index d256e77..ec65bd8 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java @@ -29,7 +29,7 @@ public interface StatsAccessor extends ReadAccessor { /** * Method which reads needed statistics for current split - * @throws Exception if retrieving the stats failed + * @throws Exception when unable to retrieve statistics */ public void retrieveStats() throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java index ed8ad28..175a6e1 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadVectorizedResolver; import org.apache.hawq.pxf.api.StatsAccessor; import java.io.ByteArrayInputStream; @@ -164,7 +165,7 @@ public class Utilities { * @param inputData input data which has protocol information * @return fragment metadata * @throws IllegalArgumentException if fragment metadata information wasn't found in input data - * @throws Exception if unable to parse the fragment + * @throws Exception when error occurred during metadata parsing */ public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception { byte[] serializedLocation = inputData.getFragmentMetadata(); @@ -197,8 +198,8 @@ public class Utilities { /** * Based on accessor information determines whether to use AggBridge - * - * @param inputData input data + * + * @param inputData input protocol data * @return true if AggBridge is applicable for current context */ public static boolean useAggBridge(InputData inputData) { @@ -234,4 +235,19 @@ public class Utilities { return false; } } + + /** + * Determines whether use vectorization + * @param inputData input protocol data + * @return true if vectorization is applicable in a current context + */ + public static boolean useVectorization(InputData inputData) { + boolean isVectorizedResolver = false; + try { + isVectorizedResolver = ArrayUtils.contains(Class.forName(inputData.getResolver()).getInterfaces(), ReadVectorizedResolver.class); + } catch (ClassNotFoundException e) { + LOG.error("Unable to load resolver class: " + e.getMessage()); + } + return isVectorizedResolver; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java index 01c09bf..5caca6d 100644 --- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java +++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java @@ -30,10 +30,14 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; +import java.util.List; import org.apache.hawq.pxf.api.Metadata; +import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.ReadVectorizedResolver; import org.apache.hawq.pxf.api.StatsAccessor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; @@ -89,6 +93,22 @@ public class UtilitiesTest { } } + class ReadVectorizedResolverImpl implements ReadVectorizedResolver { + + @Override + public List<List<OneField>> getFieldsForBatch(OneRow batch) { + return null; + } + } + + class ReadResolverImpl implements ReadResolver { + + @Override + public List<OneField> getFields(OneRow row) throws Exception { + return null; + } + } + @Test public void byteArrayToOctalStringNull() throws Exception { StringBuilder sb = null; @@ -222,4 +242,13 @@ public class UtilitiesTest { when(metaData.getNumAttrsProjected()).thenReturn(1); assertFalse(Utilities.useStats(accessor, metaData)); } + + @Test + public void useVectorization() { + InputData metaData = mock(InputData.class); + when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadVectorizedResolverImpl"); + assertTrue(Utilities.useVectorization(metaData)); + when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadResolverImpl"); + assertFalse(Utilities.useVectorization(metaData)); + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java index 9cf8f27..6e193c2 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java @@ -286,10 +286,11 @@ public class HiveDataFragmenter extends Fragmenter { InputFormat<?, ?> fformat = makeInputFormat( tablePartition.storageDesc.getInputFormat(), jobConf); String profile = null; - if (inputData.getProfile() != null) { + String userProfile = inputData.getProfile(); + if (userProfile != null) { // evaluate optimal profile based on file format if profile was explicitly specified in url // if user passed accessor+fragmenter+resolver - use them - profile = ProfileFactory.get(fformat, hasComplexTypes); + profile = ProfileFactory.get(fformat, hasComplexTypes, userProfile); } String fragmenterForProfile = null; if (profile != null) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java new file mode 100644 index 0000000..3de1500 --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java @@ -0,0 +1,106 @@ +package org.apache.hawq.pxf.plugins.hive; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.hadoop.mapred.*; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.Reader.Options; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.io.LongWritable; + +/** + * Accessor class which reads data in batches. + * One batch is 1024 rows of all projected columns + * + */ +public class HiveORCVectorizedAccessor extends HiveORCAccessor { + + private RecordReader vrr; + private int batchIndex; + private VectorizedRowBatch batch; + + public HiveORCVectorizedAccessor(InputData input) throws Exception { + super(input); + } + + @Override + public boolean openForRead() throws Exception { + Options options = new Options(); + addColumns(options); + addFragments(options); + orcReader = HiveUtilities.getOrcReader(inputData); + vrr = orcReader.rowsOptions(options); + return vrr.hasNext(); + } + + /** + * File might have multiple splits, so this method restricts + * reader to one split. + * @param options reader options to modify + */ + private void addFragments(Options options) { + FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData); + options.range(fileSplit.getStart(), fileSplit.getLength()); + } + + /** + * Reads next batch for current fragment. + * @return next batch in OneRow format, key is a batch number, data is a batch + */ + @Override + public OneRow readNextObject() throws IOException { + if (vrr.hasNext()) { + batch = vrr.nextBatch(batch); + batchIndex++; + return new OneRow(new LongWritable(batchIndex), batch); + } else { + //All batches are exhausted + return null; + } + } + + /** + * This method updated reader options to include projected columns only. + * @param options reader options to modify + * @throws Exception + */ + private void addColumns(Options options) throws Exception { + boolean[] includeColumns = new boolean[inputData.getColumns() + 1]; + for (ColumnDescriptor col : inputData.getTupleDescription()) { + if (col.isProjected()) { + includeColumns[col.columnIndex() + 1] = true; + } + } + options.include(includeColumns); + } + + @Override + public void closeForRead() throws Exception { + if (vrr != null) { + vrr.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java new file mode 100644 index 0000000..5d03d7a --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java @@ -0,0 +1,367 @@ +package org.apache.hawq.pxf.plugins.hive; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.hawq.pxf.api.io.DataType.BIGINT; +import static org.apache.hawq.pxf.api.io.DataType.BOOLEAN; +import static org.apache.hawq.pxf.api.io.DataType.BPCHAR; +import static org.apache.hawq.pxf.api.io.DataType.BYTEA; +import static org.apache.hawq.pxf.api.io.DataType.DATE; +import static org.apache.hawq.pxf.api.io.DataType.FLOAT8; +import static org.apache.hawq.pxf.api.io.DataType.INTEGER; +import static org.apache.hawq.pxf.api.io.DataType.NUMERIC; +import static org.apache.hawq.pxf.api.io.DataType.REAL; +import static org.apache.hawq.pxf.api.io.DataType.SMALLINT; +import static org.apache.hawq.pxf.api.io.DataType.TEXT; +import static org.apache.hawq.pxf.api.io.DataType.TIMESTAMP; +import static org.apache.hawq.pxf.api.io.DataType.VARCHAR; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.sql.Timestamp; +import java.sql.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadVectorizedResolver; +import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hadoop.hive.serde2.*; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.ql.exec.vector.*; + +/** + * Class which implements resolving a batch of records at once + */ +public class HiveORCVectorizedResolver extends HiveResolver implements ReadVectorizedResolver { + + private static final Log LOG = LogFactory.getLog(HiveORCVectorizedResolver.class); + + private List<List<OneField>> resolvedBatch; + private StructObjectInspector soi; + + public HiveORCVectorizedResolver(InputData input) throws Exception { + super(input); + try { + soi = (StructObjectInspector) HiveUtilities.getOrcReader(input).getObjectInspector(); + } catch (Exception e) { + LOG.error("Unable to create an object inspector."); + throw e; + } + } + + @Override + public List<List<OneField>> getFieldsForBatch(OneRow batch) { + + Writable writableObject = null; + Object fieldValue = null; + VectorizedRowBatch vectorizedBatch = (VectorizedRowBatch) batch.getData(); + + /* Allocate empty result set */ + int columnsNumber = inputData.getColumns(); + resolvedBatch = new ArrayList<List<OneField>>(vectorizedBatch.size); + + /* Create empty template row */ + ArrayList<OneField> templateRow = new ArrayList<OneField>(columnsNumber); + ArrayList<OneField> currentRow = null; + for (int j = 0; j < inputData.getColumns(); j++) { + templateRow.add(null); + } + /* Replicate template row*/ + for (int i = 0; i < vectorizedBatch.size; i++) { + currentRow = new ArrayList<OneField>(templateRow); + resolvedBatch.add(currentRow); + } + + /* process all columns*/ + List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs(); + for (int columnIndex = 0; columnIndex < vectorizedBatch.numCols; columnIndex++) { + ObjectInspector oi = allStructFieldRefs.get(columnIndex).getFieldObjectInspector(); + if (oi.getCategory() == Category.PRIMITIVE) { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + resolvePrimitiveColumn(columnIndex, oi, vectorizedBatch); + } else { + throw new UnsupportedTypeException("Unable to resolve column index:" + columnIndex + + ". Only primitive types are supported."); + } + } + + return resolvedBatch; + } + + /** + * Resolves a column of a primitive type out of given batch + * + * @param columnIndex index of the column + * @param oi object inspector + * @param vectorizedBatch input batch or records + */ + private void resolvePrimitiveColumn(int columnIndex, ObjectInspector oi, VectorizedRowBatch vectorizedBatch) { + + OneField field = null; + Writable writableObject = null; + PrimitiveCategory poc = ((PrimitiveObjectInspector) oi).getPrimitiveCategory(); + populatePrimitiveColumn(poc, oi, vectorizedBatch, columnIndex); + } + + private void addValueToColumn(int columnIndex, int rowIndex, OneField field) { + List<OneField> row = this.resolvedBatch.get(rowIndex); + row.set(columnIndex, field); + } + + private void populatePrimitiveColumn(PrimitiveCategory primitiveCategory, ObjectInspector oi, VectorizedRowBatch vectorizedBatch, int columnIndex) { + ColumnVector columnVector = vectorizedBatch.cols[columnIndex]; + Object fieldValue = null; + DataType fieldType = null; + + switch (primitiveCategory) { + case BOOLEAN: { + fieldType = BOOLEAN; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = lcv.vector[rowId] == 1; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case SHORT: { + fieldType = SMALLINT; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = (short) lcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case INT: { + fieldType = INTEGER; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = (int) lcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case LONG: { + fieldType = BIGINT; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = lcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case FLOAT: { + fieldType = REAL; + DoubleColumnVector dcv = (DoubleColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (dcv != null) { + int rowId = dcv.isRepeating ? 0 : rowIndex; + if (!dcv.isNull[rowId]) { + fieldValue = (float) dcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case DOUBLE: { + fieldType = FLOAT8; + DoubleColumnVector dcv = (DoubleColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (dcv != null) { + int rowId = dcv.isRepeating ? 0 : rowIndex; + if (!dcv.isNull[rowId]) { + fieldValue = dcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case DECIMAL: { + fieldType = NUMERIC; + DecimalColumnVector dcv = (DecimalColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (dcv != null) { + int rowId = dcv.isRepeating ? 0 : rowIndex; + if (!dcv.isNull[rowId]) { + fieldValue = dcv.vector[rowId]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case VARCHAR: { + fieldType = VARCHAR; + BytesColumnVector bcv = (BytesColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (columnVector != null) { + int rowId = bcv.isRepeating ? 0 : rowIndex; + if (!bcv.isNull[rowId]) { + Text textValue = new Text(); + textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]); + fieldValue = textValue; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case CHAR: { + fieldType = BPCHAR; + BytesColumnVector bcv = (BytesColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (columnVector != null) { + int rowId = bcv.isRepeating ? 0 : rowIndex; + if (!bcv.isNull[rowId]) { + Text textValue = new Text(); + textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]); + fieldValue = textValue; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case STRING: { + fieldType = TEXT; + BytesColumnVector bcv = (BytesColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (columnVector != null) { + int rowId = bcv.isRepeating ? 0 : rowIndex; + if (!bcv.isNull[rowId]) { + Text textValue = new Text(); + textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]); + fieldValue = textValue; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case BINARY: { + fieldType = BYTEA; + BytesColumnVector bcv = (BytesColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (columnVector != null) { + int rowId = bcv.isRepeating ? 0 : rowIndex; + if (!bcv.isNull[rowId]) { + fieldValue = new byte[bcv.length[rowId]]; + System.arraycopy(bcv.vector[rowId], bcv.start[rowId], fieldValue, 0, bcv.length[rowId]); + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case DATE: { + fieldType = DATE; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = new Date(DateWritable.daysToMillis((int) lcv.vector[rowIndex])); + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + case BYTE: { + fieldType = SMALLINT; + LongColumnVector lcv = (LongColumnVector) columnVector; + for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) { + fieldValue = null; + if (lcv != null) { + int rowId = lcv.isRepeating ? 0 : rowIndex; + if (!lcv.isNull[rowId]) { + fieldValue = (short) lcv.vector[rowIndex]; + } + } + addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(), fieldValue)); + } + break; + } + default: { + throw new UnsupportedTypeException(oi.getTypeName() + + " conversion is not supported by " + + getClass().getSimpleName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java index f36f074..7294a02 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hawq.pxf.api.Metadata; /** * Factory class which returns optimal profile for given input format @@ -35,16 +34,20 @@ public class ProfileFactory { private static final String HIVE_RC_PROFILE = "HiveRC"; private static final String HIVE_ORC_PROFILE = "HiveORC"; private static final String HIVE_PROFILE = "Hive"; + private static final String HIVE_ORC_VECTORIZED_PROFILE = "HiveVectorizedORC"; /** * The method which returns optimal profile * * @param inputFormat input format of table/partition * @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType + * @param userProfileName profile name provided by user * @return name of optimal profile */ - public static String get(InputFormat inputFormat, boolean hasComplexTypes) { + public static String get(InputFormat inputFormat, boolean hasComplexTypes, String userProfileName) { String profileName = null; + if (HIVE_ORC_VECTORIZED_PROFILE.equals(userProfileName)) + return userProfileName; if (inputFormat instanceof TextInputFormat && !hasComplexTypes) { profileName = HIVE_TEXT_PROFILE; } else if (inputFormat instanceof RCFileInputFormat) { @@ -58,4 +61,12 @@ public class ProfileFactory { return profileName; } + /** + * @see ProfileFactory#get(InputFormat, boolean, String) + */ + public static String get(InputFormat inputFormat, boolean hasComplexTypes) { + String profileName = get(inputFormat, hasComplexTypes, null); + return profileName; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java index 1c199d3..f9dbd72 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java @@ -137,6 +137,19 @@ public class BridgeOutputBuilder { return outputList; } + public LinkedList<Writable> makeVectorizedOutput(List<List<OneField>> recordsBatch) throws BadRecordException { + outputList.clear(); + if (recordsBatch != null) { + for (List<OneField> record : recordsBatch) { + if (inputData.outputFormat() == OutputFormat.GPDBWritable) { + makeGPDBWritableOutput(); + } + fillOutputRecord(record); + } + } + return outputList; + } + /** * Returns whether or not this is a partial line. * http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java index edd0a99..dd095dd 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java @@ -166,7 +166,7 @@ public class ReadBridge implements Bridge { * analyzing the exception type, and when we discover that the actual * problem was a data problem, we return the errorOutput GPDBWritable. */ - private boolean isDataException(IOException ex) { + protected boolean isDataException(IOException ex) { return (ex instanceof EOFException || ex instanceof CharacterCodingException || ex instanceof CharConversionException http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java new file mode 100644 index 0000000..ca222f1 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java @@ -0,0 +1,102 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadVectorizedResolver; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; + + +public class ReadVectorizedBridge extends ReadBridge { + + private static final Log LOG = LogFactory.getLog(ReadVectorizedBridge.class); + + public ReadVectorizedBridge(ProtocolData protData) throws Exception { + super(protData); + } + + @Override + public Writable getNext() throws Exception { + Writable output = null; + OneRow batch = null; + + if (!outputQueue.isEmpty()) { + return outputQueue.pop(); + } + + try { + while (outputQueue.isEmpty()) { + batch = fileAccessor.readNextObject(); + if (batch == null) { + output = outputBuilder.getPartialLine(); + if (output != null) { + LOG.warn("A partial record in the end of the fragment"); + } + // if there is a partial line, return it now, otherwise it + // will return null + return output; + } + + // we checked before that outputQueue is empty, so we can + // override it. + List<List<OneField>> resolvedBatch = ((ReadVectorizedResolver) fieldsResolver).getFieldsForBatch(batch); + outputQueue = outputBuilder.makeVectorizedOutput(resolvedBatch); + if (!outputQueue.isEmpty()) { + output = outputQueue.pop(); + break; + } + } + } catch (IOException ex) { + if (!isDataException(ex)) { + throw ex; + } + output = outputBuilder.getErrorOutput(ex); + } catch (BadRecordException ex) { + String row_info = "null"; + if (batch != null) { + row_info = batch.toString(); + } + if (ex.getCause() != null) { + LOG.debug("BadRecordException " + ex.getCause().toString() + + ": " + row_info); + } else { + LOG.debug(ex.toString() + ": " + row_info); + } + output = outputBuilder.getErrorOutput(ex); + } catch (Exception ex) { + throw ex; + } + + return output; + } + + @Override + public void endIteration() throws Exception { + fileAccessor.closeForRead(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java index 4294e09..027663b 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -44,6 +44,7 @@ import org.apache.hawq.pxf.service.AggBridge; import org.apache.hawq.pxf.service.Bridge; import org.apache.hawq.pxf.service.ReadBridge; import org.apache.hawq.pxf.service.ReadSamplingBridge; +import org.apache.hawq.pxf.service.ReadVectorizedBridge; import org.apache.hawq.pxf.service.io.Writable; import org.apache.hawq.pxf.service.utilities.ProtocolData; import org.apache.hawq.pxf.service.utilities.SecuredHDFS; @@ -101,6 +102,8 @@ public class BridgeResource extends RestResource { bridge = new ReadSamplingBridge(protData); } else if (Utilities.useAggBridge(protData)) { bridge = new AggBridge(protData); + } else if (Utilities.useVectorization(protData)) { + bridge = new ReadVectorizedBridge(protData); } else { bridge = new ReadBridge(protData); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml index f076ead..a8666eb 100644 --- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -101,6 +101,17 @@ under the License. <outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat> </plugins> </profile> + <profile> + <name>HiveVectorizedORC</name> + <description></description> + <plugins> + <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter> + <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedAccessor</accessor> + <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedResolver</resolver> + <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + <outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat> + </plugins> + </profile> <profile> <name>HdfsTextSimple</name> <description>This profile is suitable for using when reading delimited single line records from plain text files
