HAWQ-931. ORC optimized profile for PPD/CP
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/98a302da Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/98a302da Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/98a302da Branch: refs/heads/HAWQ-931 Commit: 98a302da09db7952d782809c784c7a820ee579ef Parents: 8906240 Author: Shivram Mani <[email protected]> Authored: Wed Jul 27 17:48:59 2016 -0700 Committer: Shivram Mani <[email protected]> Committed: Tue Aug 2 15:12:58 2016 -0700 ---------------------------------------------------------------------- pxf/build.gradle | 1 + pxf/gradle.properties | 3 +- .../pxf/api/utilities/ColumnDescriptor.java | 28 +- .../plugins/hive/HiveColumnarSerdeResolver.java | 7 +- .../plugins/hive/HiveInputFormatFragmenter.java | 16 +- .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 170 +++++++ .../pxf/plugins/hive/HiveORCSerdeResolver.java | 439 +++++++++++++++++++ .../hawq/pxf/service/rest/RestResource.java | 8 +- .../pxf/service/utilities/ProtocolData.java | 30 +- .../src/main/resources/pxf-profiles-default.xml | 14 + 10 files changed, 707 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/build.gradle ---------------------------------------------------------------------- diff --git a/pxf/build.gradle b/pxf/build.gradle index 23d688f..cd29c01 100644 --- a/pxf/build.gradle +++ b/pxf/build.gradle @@ -314,6 +314,7 @@ project('pxf-hive') { compile "org.apache.hive:hive-metastore:$hiveVersion" compile "org.apache.hive:hive-common:$hiveVersion" compile "org.apache.hive:hive-serde:$hiveVersion" + compile "org.apache.orc:orc-core:$orcVersion" testCompile 'pl.pragmatists:JUnitParams:1.0.2' configurations { // Remove hive-exec from unit tests as it causes VerifyError http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/gradle.properties ---------------------------------------------------------------------- diff --git a/pxf/gradle.properties b/pxf/gradle.properties index 6827b89..a601936 100644 --- a/pxf/gradle.properties +++ b/pxf/gradle.properties @@ -23,4 +23,5 @@ hiveVersion=1.2.1 hbaseVersionJar=1.1.2 hbaseVersionRPM=1.1.2 tomcatVersion=7.0.62 -pxfProtocolVersion=v14 \ No newline at end of file +pxfProtocolVersion=v14 +orcVersion=1.1.1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java index baaca1d..4b9dc9c 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java @@ -30,6 +30,7 @@ public class ColumnDescriptor { String gpdbColumnName; String gpdbColumnTypeName; int gpdbColumnIndex; + boolean isProjected; /** * Reserved word for a table record key. @@ -50,6 +51,21 @@ public class ColumnDescriptor { gpdbColumnTypeName = typename; gpdbColumnName = name; gpdbColumnIndex = index; + isProjected = true; + } + + /** + * Constructs a ColumnDescriptor. + * + * @param name column name + * @param typecode OID + * @param index column index + * @param typename type name + * @param isProj type boolean + */ + public ColumnDescriptor(String name, int typecode, int index, String typename, boolean isProj) { + this(name, typecode, index, typename); + isProjected = isProj; } /** @@ -62,6 +78,7 @@ public class ColumnDescriptor { this.gpdbColumnName = copy.gpdbColumnName; this.gpdbColumnIndex = copy.gpdbColumnIndex; this.gpdbColumnTypeName = copy.gpdbColumnTypeName; + this.isProjected = copy.isProjected; } public String columnName() { @@ -89,11 +106,20 @@ public class ColumnDescriptor { return RECORD_KEY_NAME.equalsIgnoreCase(gpdbColumnName); } + public boolean isProjected() { + return isProjected; + } + + public void setProjected(boolean projected) { + isProjected = projected; + } + @Override public String toString() { return "ColumnDescriptor [gpdbColumnTypeCode=" + gpdbColumnTypeCode + ", gpdbColumnName=" + gpdbColumnName + ", gpdbColumnTypeName=" + gpdbColumnTypeName - + ", gpdbColumnIndex=" + gpdbColumnIndex + "]"; + + ", gpdbColumnIndex=" + gpdbColumnIndex + + ", isProjected=" + isProjected + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java index d298bac..497ee2e 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java @@ -19,7 +19,6 @@ package org.apache.hawq.pxf.plugins.hive; * under the License. */ - import org.apache.hawq.pxf.api.BadRecordException; import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; @@ -31,6 +30,7 @@ import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedColumnarSerDe; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDeBase; @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; + import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -76,6 +77,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver { serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE; } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) { serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE; + } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name())) { + serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE; } else { throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); } @@ -138,6 +141,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver { deserializer = new ColumnarSerDe(); } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) { deserializer = new LazyBinaryColumnarSerDe(); + } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE) { + deserializer = new VectorizedColumnarSerDe(); } else { throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index a666b8b..955aa9a 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -56,9 +56,12 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; + static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + static final String STR_VECTORIZED_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.VectorizedOrcSerde"; + static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; private static final int EXPECTED_NUM_OF_TOKS = 3; public static final int TOK_SERDE = 0; public static final int TOK_KEYS = 1; @@ -67,14 +70,17 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { /** Defines the Hive input formats currently supported in pxf */ public enum PXF_HIVE_INPUT_FORMATS { RC_FILE_INPUT_FORMAT, - TEXT_FILE_INPUT_FORMAT + TEXT_FILE_INPUT_FORMAT, + ORC_FILE_INPUT_FORMAT } /** Defines the Hive serializers (serde classes) currently supported in pxf */ public enum PXF_HIVE_SERDES { COLUMNAR_SERDE, LAZY_BINARY_COLUMNAR_SERDE, - LAZY_SIMPLE_SERDE + LAZY_SIMPLE_SERDE, + ORC_SERDE, + VECTORIZED_ORC_SERDE } /** @@ -234,6 +240,8 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name(); case STR_TEXT_FILE_INPUT_FORMAT: return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name(); + case STR_ORC_FILE_INPUT_FORMAT: + return PXF_HIVE_INPUT_FORMATS.ORC_FILE_INPUT_FORMAT.name(); default: throw new IllegalArgumentException( "HiveInputFormatFragmenter does not yet support " @@ -259,6 +267,10 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(); case STR_LAZY_SIMPLE_SERDE: return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name(); + case STR_ORC_SERDE: + return PXF_HIVE_SERDES.ORC_SERDE.name(); + case STR_VECTORIZED_ORC_SERDE: + return PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name(); default: throw new UnsupportedTypeException( "HiveInputFormatFragmenter does not yet support " http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java new file mode 100644 index 0000000..43c48b2 --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -0,0 +1,170 @@ +package org.apache.hawq.pxf.plugins.hive; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hawq.pxf.api.FilterParser; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.commons.lang.StringUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; + +/** + * Specialization of HiveAccessor for a Hive table that stores only ORC files. + * This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files. + * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver} + */ +public class HiveORCAccessor extends HiveAccessor { + + private RecordReader batchReader = null; + private Reader reader = null; + private VectorizedRowBatch batch = null; + + private final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; + private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; + private final String SARG_PUSHDOWN = "sarg.pushdown"; + + /** + * Constructs a HiveRCFileAccessor. + * + * @param input input containing user data + * @throws Exception if user data was wrong + */ + public HiveORCAccessor(InputData input) throws Exception { + super(input, new OrcInputFormat()); + String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.ORC_SERDE.name(), PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name()); + initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); + filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + } + + @Override + public boolean openForRead() throws Exception { + addColumns(); + addFilters(); + return super.openForRead(); + } + + @Override + protected Object getReader(JobConf jobConf, InputSplit split) + throws IOException { + return inputFormat.getRecordReader(split, jobConf, Reporter.NULL); + } + + /** + * Adds the table tuple description to JobConf ojbect + * so only these columns will be returned. + */ + private void addColumns() throws Exception { + + List<String> colIds = new ArrayList<String>(); + List<String> colNames = new ArrayList<String>(); + for(ColumnDescriptor col: inputData.getTupleDescription()) { + if(col.isProjected()) { + colIds.add(String.valueOf(col.columnIndex())); + colNames.add(col.columnName()); + } + } + jobConf.set(READ_ALL_COLUMNS, "false"); + jobConf.set(READ_COLUMN_IDS_CONF_STR, StringUtils.join(colIds, ",")); + jobConf.set(READ_COLUMN_NAMES_CONF_STR, StringUtils.join(colNames, ",")); + } + + /** + * Uses {@link HiveFilterBuilder} to translate a filter string into a + * Hive {@link SearchArgument} object. The result is added as a filter to + * JobConf object + */ + private void addFilters() throws Exception { + if (!inputData.hasFilter()) { + return; + } + + /* Predicate pushdown configuration */ + String filterStr = inputData.getFilterString(); + HiveFilterBuilder eval = new HiveFilterBuilder(inputData); + Object filter = eval.getFilterObject(filterStr); + + SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder(); + filterBuilder.startAnd(); + if (filter instanceof List) { + for (Object f : (List<?>) filter) { + buildArgument(filterBuilder, f); + } + } else { + buildArgument(filterBuilder, filter); + } + filterBuilder.end(); + SearchArgument sarg = filterBuilder.build(); + jobConf.set(SARG_PUSHDOWN, sarg.toKryo()); + } + + private void buildArgument(SearchArgument.Builder builder, Object filterObj) { + /* The below functions will not be compatible and requires update with Hive 2.0 APIs */ + FilterParser.BasicFilter filter = (FilterParser.BasicFilter) filterObj; + int filterColumnIndex = filter.getColumn().index(); + Object filterValue = filter.getConstant().constant(); + ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex); + String filterColumnName = filterColumn.columnName(); + + switch(filter.getOperation()) { + case HDOP_LT: + builder.lessThan(filterColumnName, filterValue); + break; + case HDOP_GT: + builder.startNot().lessThanEquals(filterColumnName, filterValue).end(); + break; + case HDOP_LE: + builder.lessThanEquals(filterColumnName, filterValue); + break; + case HDOP_GE: + builder.startNot().lessThanEquals(filterColumnName, filterValue).end(); + break; + case HDOP_EQ: + builder.equals(filterColumnName, filterValue); + break; + case HDOP_NE: + builder.startNot().equals(filterColumnName, filterValue).end(); + break; + case HDOP_LIKE: + break; + } + return; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java new file mode 100644 index 0000000..6ac4e70 --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java @@ -0,0 +1,439 @@ +package org.apache.hawq.pxf.plugins.hive; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.lang.CharUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.*; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.*; + +import static org.apache.hawq.pxf.api.io.DataType.*; +import static org.apache.hawq.pxf.api.io.DataType.DATE; +import static org.apache.hawq.pxf.api.io.DataType.SMALLINT; + +/** + * Specialized HiveResolver for a Hive table stored as RC file. + * Use together with HiveInputFormatFragmenter/HiveRCFileAccessor. + */ +public class HiveORCSerdeResolver extends HiveResolver { + private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class); + private OrcSerde deserializer; + private boolean firstColumn; + private StringBuilder builder; + private StringBuilder parts; + private int numberOfPartitions; + private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private static final String MAPKEY_DELIM = ":"; + private static final String COLLECTION_DELIM = ","; + private String collectionDelim; + private String mapkeyDelim; + + public HiveORCSerdeResolver(InputData input) throws Exception { + super(input); + } + + /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */ + @Override + void parseUserData(InputData input) throws Exception { + String[] toks = HiveInputFormatFragmenter.parseToks(input); + String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE]; + if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) { + serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE; + } else { + throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); + } + parts = new StringBuilder(); + partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + parseDelimiterChar(input); + collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM + : input.getUserProperty("COLLECTION_DELIM"); + mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM + : input.getUserProperty("MAPKEY_DELIM"); + } + + @Override + void initPartitionFields() { + numberOfPartitions = initPartitionFields(parts); + } + + /** + * getFields returns a singleton list of OneField item. + * OneField item contains two fields: an integer representing the VARCHAR type and a Java + * Object representing the field value. + */ + @Override + public List<OneField> getFields(OneRow onerow) throws Exception { + + Object tuple = deserializer.deserialize((Writable) onerow.getData()); + // Each Hive record is a Struct + StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector(); + List<OneField> record = traverseStruct(tuple, soi, false); + + return record; + + } + + /* + * Get and init the deserializer for the records of this Hive data fragment. + * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated + * but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated interface. + */ + @SuppressWarnings("deprecation") + @Override + void initSerde(InputData input) throws Exception { + Properties serdeProperties = new Properties(); + int numberOfDataColumns = input.getColumns() - numberOfPartitions; + + LOG.debug("Serde number of columns is " + numberOfDataColumns); + + StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter + StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter + String delim = ""; + for (int i = 0; i < numberOfDataColumns; i++) { + ColumnDescriptor column = input.getColumn(i); + String columnName = column.columnName(); + String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()), columnName); + columnNames.append(delim).append(columnName); + columnTypes.append(delim).append(columnType); + delim = ","; + } + serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString()); + serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); + + if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) { + deserializer = new OrcSerde(); + } else { + throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ + } + + deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties); + } + + /* + * If the object representing the whole record is null or if an object + * representing a composite sub-object (map, list,..) is null - then + * BadRecordException will be thrown. If a primitive field value is null, + * then a null will appear for the field in the record in the query result. + */ + private void traverseTuple(Object obj, ObjectInspector objInspector, + List<OneField> record, boolean toFlatten) + throws IOException, BadRecordException { + ObjectInspector.Category category = objInspector.getCategory(); + if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) { + throw new BadRecordException("NULL Hive composite object"); + } + switch (category) { + case PRIMITIVE: + resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector, + record, toFlatten); + break; + case LIST: + List<OneField> listRecord = traverseList(obj, + (ListObjectInspector) objInspector); + addOneFieldToRecord(record, TEXT, String.format("[%s]", + HdfsUtilities.toString(listRecord, collectionDelim))); + break; + case MAP: + List<OneField> mapRecord = traverseMap(obj, + (MapObjectInspector) objInspector); + addOneFieldToRecord(record, TEXT, String.format("{%s}", + HdfsUtilities.toString(mapRecord, collectionDelim))); + break; + case STRUCT: + List<OneField> structRecord = traverseStruct(obj, + (StructObjectInspector) objInspector, true); + addOneFieldToRecord(record, TEXT, String.format("{%s}", + HdfsUtilities.toString(structRecord, collectionDelim))); + break; + case UNION: + List<OneField> unionRecord = traverseUnion(obj, + (UnionObjectInspector) objInspector); + addOneFieldToRecord(record, TEXT, String.format("[%s]", + HdfsUtilities.toString(unionRecord, collectionDelim))); + break; + default: + throw new UnsupportedTypeException("Unknown category type: " + + objInspector.getCategory()); + } + } + + private List<OneField> traverseUnion(Object obj, UnionObjectInspector uoi) + throws BadRecordException, IOException { + List<OneField> unionRecord = new LinkedList<>(); + List<? extends ObjectInspector> ois = uoi.getObjectInspectors(); + if (ois == null) { + throw new BadRecordException( + "Illegal value NULL for Hive data type Union"); + } + traverseTuple(uoi.getField(obj), ois.get(uoi.getTag(obj)), unionRecord, + true); + return unionRecord; + } + + private List<OneField> traverseList(Object obj, ListObjectInspector loi) + throws BadRecordException, IOException { + List<OneField> listRecord = new LinkedList<>(); + List<?> list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + throw new BadRecordException( + "Illegal value NULL for Hive data type List"); + } + for (Object object : list) { + traverseTuple(object, eoi, listRecord, true); + } + return listRecord; + } + + private List<OneField> traverseStruct(Object struct, + StructObjectInspector soi, + boolean toFlatten) + throws BadRecordException, IOException { + List<? extends StructField> fields = soi.getAllStructFieldRefs(); + List<Object> structFields = soi.getStructFieldsDataAsList(struct); + if (structFields == null) { + throw new BadRecordException( + "Illegal value NULL for Hive data type Struct"); + } + List<OneField> structRecord = new LinkedList<>(); + List<OneField> complexRecord = new LinkedList<>(); + for (int i = 0; i < structFields.size(); i++) { + if (toFlatten) { + complexRecord.add(new OneField(TEXT.getOID(), String.format( + "\"%s\"", fields.get(i).getFieldName()))); + } + traverseTuple(structFields.get(i), + fields.get(i).getFieldObjectInspector(), complexRecord, + toFlatten); + if (toFlatten) { + addOneFieldToRecord(structRecord, TEXT, + HdfsUtilities.toString(complexRecord, mapkeyDelim)); + complexRecord.clear(); + } + } + return toFlatten ? structRecord : complexRecord; + } + + private List<OneField> traverseMap(Object obj, MapObjectInspector moi) + throws BadRecordException, IOException { + List<OneField> complexRecord = new LinkedList<>(); + List<OneField> mapRecord = new LinkedList<>(); + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map<?, ?> map = moi.getMap(obj); + if (map == null) { + throw new BadRecordException( + "Illegal value NULL for Hive data type Map"); + } else if (map.isEmpty()) { + traverseTuple(null, koi, complexRecord, true); + traverseTuple(null, voi, complexRecord, true); + addOneFieldToRecord(mapRecord, TEXT, + HdfsUtilities.toString(complexRecord, mapkeyDelim)); + } else { + for (Map.Entry<?, ?> entry : map.entrySet()) { + traverseTuple(entry.getKey(), koi, complexRecord, true); + traverseTuple(entry.getValue(), voi, complexRecord, true); + addOneFieldToRecord(mapRecord, TEXT, + HdfsUtilities.toString(complexRecord, mapkeyDelim)); + complexRecord.clear(); + } + } + return mapRecord; + } + + private void resolvePrimitive(Object o, PrimitiveObjectInspector oi, + List<OneField> record, boolean toFlatten) + throws IOException { + Object val; + switch (oi.getPrimitiveCategory()) { + case BOOLEAN: { + val = (o != null) ? ((BooleanObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, BOOLEAN, val); + break; + } + case SHORT: { + val = (o != null) ? ((ShortObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, SMALLINT, val); + break; + } + case INT: { + val = (o != null) ? ((IntObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, INTEGER, val); + break; + } + case LONG: { + val = (o != null) ? ((LongObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, BIGINT, val); + break; + } + case FLOAT: { + val = (o != null) ? ((FloatObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, REAL, val); + break; + } + case DOUBLE: { + val = (o != null) ? ((DoubleObjectInspector) oi).get(o) : null; + addOneFieldToRecord(record, FLOAT8, val); + break; + } + case DECIMAL: { + String sVal = null; + if (o != null) { + HiveDecimal hd = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o); + if (hd != null) { + BigDecimal bd = hd.bigDecimalValue(); + sVal = bd.toString(); + } + } + addOneFieldToRecord(record, NUMERIC, sVal); + break; + } + case STRING: { + val = (o != null) ? ((StringObjectInspector) oi).getPrimitiveJavaObject(o) + : null; + addOneFieldToRecord(record, TEXT, + toFlatten ? String.format("\"%s\"", val) : val); + break; + } + case VARCHAR: + val = (o != null) ? ((HiveVarcharObjectInspector) oi).getPrimitiveJavaObject(o) + : null; + addOneFieldToRecord(record, VARCHAR, + toFlatten ? String.format("\"%s\"", val) : val); + break; + case CHAR: + val = (o != null) ? ((HiveCharObjectInspector) oi).getPrimitiveJavaObject(o) + : null; + addOneFieldToRecord(record, BPCHAR, + toFlatten ? String.format("\"%s\"", val) : val); + break; + case BINARY: { + byte[] toEncode = null; + if (o != null) { + BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(o); + toEncode = new byte[bw.getLength()]; + System.arraycopy(bw.getBytes(), 0, toEncode, 0, + bw.getLength()); + } + addOneFieldToRecord(record, BYTEA, toEncode); + break; + } + case TIMESTAMP: { + val = (o != null) ? ((TimestampObjectInspector) oi).getPrimitiveJavaObject(o) + : null; + addOneFieldToRecord(record, TIMESTAMP, val); + break; + } + case DATE: + val = (o != null) ? ((DateObjectInspector) oi).getPrimitiveJavaObject(o) + : null; + addOneFieldToRecord(record, DATE, val); + break; + case BYTE: { /* TINYINT */ + val = (o != null) ? new Short(((ByteObjectInspector) oi).get(o)) + : null; + addOneFieldToRecord(record, SMALLINT, val); + break; + } + default: { + throw new UnsupportedTypeException(oi.getTypeName() + + " conversion is not supported by " + + getClass().getSimpleName()); + } + } + } + + private void addOneFieldToRecord(List<OneField> record, + DataType gpdbWritableType, Object val) { + record.add(new OneField(gpdbWritableType.getOID(), val)); + } + + /* + * Gets the delimiter character from the URL, verify and store it. Must be a + * single ascii character (same restriction as Hawq's). If a hex + * representation was passed, convert it to its char. + */ + void parseDelimiterChar(InputData input) { + + String userDelim = input.getUserProperty("DELIMITER"); + + if (userDelim == null) { + throw new IllegalArgumentException("DELIMITER is a required option"); + } + + final int VALID_LENGTH = 1; + final int VALID_LENGTH_HEX = 4; + + if (userDelim.startsWith("\\x")) { // hexadecimal sequence + + if (userDelim.length() != VALID_LENGTH_HEX) { + throw new IllegalArgumentException( + "Invalid hexdecimal value for delimiter (got" + + userDelim + ")"); + } + + delimiter = (char) Integer.parseInt( + userDelim.substring(2, VALID_LENGTH_HEX), 16); + + if (!CharUtils.isAscii(delimiter)) { + throw new IllegalArgumentException( + "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII " + + delimiter + ")"); + } + + return; + } + + if (userDelim.length() != VALID_LENGTH) { + throw new IllegalArgumentException( + "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got " + + userDelim + ")"); + } + + if (!CharUtils.isAscii(userDelim.charAt(0))) { + throw new IllegalArgumentException( + "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII " + + userDelim + ")"); + } + + delimiter = userDelim.charAt(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java index 60bb31e..633e78c 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java @@ -24,6 +24,7 @@ import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.codec.CharEncoding; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; import java.io.UnsupportedEncodingException; import java.util.List; @@ -56,7 +57,12 @@ public abstract class RestResource { String key = entry.getKey(); List<String> values = entry.getValue(); if (values != null) { - String value = values.get(0); + String value; + if(values.size() > 1) { + value = StringUtils.join(values, ","); + } else { + value = values.get(0); + } if (value != null) { // converting to value UTF-8 encoding value = new String(value.getBytes(CharEncoding.ISO_8859_1), http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index 0337937..f492378 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -383,16 +383,40 @@ public class ProtocolData extends InputData { /* * Sets the tuple description for the record + * Attribute Projection information is optional */ void parseTupleDescription() { + + /* Process column projection info */ + String columnProjStr = getOptionalProperty("ATTRS-PROJ"); + List<Integer> columnProjList = new ArrayList<Integer>(); + if(columnProjStr != null) { + int columnProj = Integer.parseInt(columnProjStr); + if(columnProj > 0) { + String columnProjIndexStr = getProperty("ATTRS-PROJ-IDX"); + String columnProjIdx[] = columnProjIndexStr.split(","); + for(int i = 0; i < columnProj; i++) { + columnProjList.add(Integer.valueOf(columnProjIdx[i])); + } + } else { + /* This is a special case to handle aggregate queries not related to any specific column + * eg: count(*) queries. */ + columnProjList.add(0); + } + } + int columns = getIntProperty("ATTRS"); for (int i = 0; i < columns; ++i) { String columnName = getProperty("ATTR-NAME" + i); int columnTypeCode = getIntProperty("ATTR-TYPECODE" + i); String columnTypeName = getProperty("ATTR-TYPENAME" + i); - - ColumnDescriptor column = new ColumnDescriptor(columnName, - columnTypeCode, i, columnTypeName); + ColumnDescriptor column; + if(columnProjStr != null) { + column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName, columnProjList.contains(i)); + } else { + /* For data formats that don't support column projection */ + column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName); + } tupleDescription.add(column); if (columnName.equalsIgnoreCase(ColumnDescriptor.RECORD_KEY_NAME)) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml index d72df94..9025cc1 100644 --- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -80,6 +80,20 @@ under the License. </plugins> </profile> <profile> + <name>HiveORC</name> + <description>This profile is suitable only for Hive tables stored in ORC files + and serialized with either the ColumnarSerDe or the LazyBinaryColumnarSerDe. + It is much faster than the general purpose Hive profile. + DELIMITER parameter is mandatory. + </description> + <plugins> + <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter> + <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor> + <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver> + <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + </plugins> + </profile> + <profile> <name>HdfsTextSimple</name> <description>This profile is suitable for using when reading delimited single line records from plain text files on HDFS
