HAWQ-1228. Use profile based on file format in HCatalog integration(HiveRC, HiveText profiles).
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/6fa1ced2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/6fa1ced2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/6fa1ced2 Branch: refs/heads/2.1.0.0-incubating Commit: 6fa1ced20e8bb2820b73e6904f77c4b4a1ed6de2 Parents: aac8868 Author: Oleksandr Diachenko <[email protected]> Authored: Mon Jan 30 23:38:06 2017 -0800 Committer: Oleksandr Diachenko <[email protected]> Committed: Mon Jan 30 23:38:50 2017 -0800 ---------------------------------------------------------------------- pxf/gradle.properties | 2 +- .../java/org/apache/hawq/pxf/api/Metadata.java | 51 +++- .../org/apache/hawq/pxf/api/OutputFormat.java | 37 ++- .../hawq/pxf/api/utilities/InputData.java | 1 + .../org/apache/hawq/pxf/api/MetadataTest.java | 2 +- .../hawq/pxf/plugins/hive/HiveAccessor.java | 14 +- .../plugins/hive/HiveColumnarSerdeResolver.java | 60 ++--- .../pxf/plugins/hive/HiveDataFragmenter.java | 28 +- .../plugins/hive/HiveInputFormatFragmenter.java | 41 --- .../pxf/plugins/hive/HiveLineBreakAccessor.java | 10 +- .../pxf/plugins/hive/HiveMetadataFetcher.java | 85 +++--- .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 9 +- .../pxf/plugins/hive/HiveORCSerdeResolver.java | 44 +--- .../pxf/plugins/hive/HiveRCFileAccessor.java | 10 +- .../hawq/pxf/plugins/hive/HiveResolver.java | 107 ++++---- .../plugins/hive/HiveStringPassResolver.java | 39 ++- .../hawq/pxf/plugins/hive/HiveUserData.java | 135 ++++++++++ .../hive/utilities/EnumHiveToHawqType.java | 31 ++- .../plugins/hive/utilities/HiveUtilities.java | 263 +++++++++++++++---- .../plugins/hive/utilities/ProfileFactory.java | 61 +++++ .../plugins/hive/HiveMetadataFetcherTest.java | 3 + .../pxf/plugins/hive/HiveORCAccessorTest.java | 9 +- .../hive/utilities/HiveUtilitiesTest.java | 53 ++++ .../hive/utilities/ProfileFactoryTest.java | 65 +++++ .../hawq/pxf/service/BridgeOutputBuilder.java | 8 +- .../pxf/service/MetadataResponseFormatter.java | 3 +- .../apache/hawq/pxf/service/ProfileFactory.java | 45 ---- .../hawq/pxf/service/rest/MetadataResource.java | 9 +- .../hawq/pxf/service/rest/VersionResource.java | 2 +- .../pxf/service/utilities/ProtocolData.java | 22 +- .../src/main/resources/pxf-profiles-default.xml | 14 +- .../service/MetadataResponseFormatterTest.java | 16 +- src/backend/access/external/fileam.c | 3 + src/backend/access/external/pxfheaders.c | 21 +- .../access/external/test/pxfheaders_test.c | 18 ++ src/backend/catalog/external/externalmd.c | 137 +++++++--- src/bin/gpfusion/gpbridgeapi.c | 6 +- src/include/access/hd_work_mgr.h | 2 + src/include/access/pxfheaders.h | 1 + src/include/access/pxfuriparser.h | 2 +- src/include/catalog/external/itemmd.h | 5 + src/include/catalog/pg_exttable.h | 14 +- .../regress/data/hcatalog/single_table.json | 2 +- .../data/hcatalog/single_table_text.json | 1 + src/test/regress/input/json_load.source | 12 +- src/test/regress/json_utils.c | 24 +- src/test/regress/output/json_load.source | 35 ++- 47 files changed, 1109 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/gradle.properties ---------------------------------------------------------------------- diff --git a/pxf/gradle.properties b/pxf/gradle.properties index b003c56..2af17ef 100644 --- a/pxf/gradle.properties +++ b/pxf/gradle.properties @@ -23,5 +23,5 @@ hiveVersion=1.2.1 hbaseVersionJar=1.1.2 hbaseVersionRPM=1.1.2 tomcatVersion=7.0.62 -pxfProtocolVersion=v14 +pxfProtocolVersion=v15 osFamily=el6 http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java index 9e1c137..bb22d41 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java @@ -22,6 +22,8 @@ package org.apache.hawq.pxf.api; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.commons.lang.StringUtils; @@ -68,14 +70,16 @@ public class Metadata { } /** - * Class representing item field - name, type, source type, modifiers. + * Class representing item field - name, type, source type, is complex type?, modifiers. * Type - exposed type of field * Source type - type of field in underlying source + * Is complex type - whether source type is complex type * Modifiers - additional attributes which describe type or field */ public static class Field { private String name; private EnumHawqType type; // field type which PXF exposes + private boolean isComplexType; // whether source field's type is complex private String sourceType; // field type PXF reads from private String[] modifiers; // type modifiers, optional field @@ -91,12 +95,17 @@ public class Metadata { this.sourceType = sourceType; } - public Field(String name, EnumHawqType type, String sourceType, - String[] modifiers) { + public Field(String name, EnumHawqType type, String sourceType, String[] modifiers) { this(name, type, sourceType); this.modifiers = modifiers; } + public Field(String name, EnumHawqType type, boolean isComplexType, String sourceType, String[] modifiers) { + this(name, type, sourceType); + this.modifiers = modifiers; + this.isComplexType = isComplexType; + } + public String getName() { return name; } @@ -112,6 +121,14 @@ public class Metadata { public String[] getModifiers() { return modifiers; } + + public boolean isComplexType() { + return isComplexType; + } + + public void setComplexType(boolean isComplexType) { + this.isComplexType = isComplexType; + } } /** @@ -123,6 +140,34 @@ public class Metadata { * Item's fields */ private List<Metadata.Field> fields; + private Set<OutputFormat> outputFormats; + private Map<String, String> outputParameters; + + /** + * Returns an item's output formats, @see OutputFormat. + * + * @return item's output formats + */ + public Set<OutputFormat> getOutputFormats() { + return outputFormats; + } + + public void setOutputFormats(Set<OutputFormat> outputFormats) { + this.outputFormats = outputFormats; + } + + /** + * Returns an item's output parameters, for example - delimiters etc. + * + * @return item's output parameters + */ + public Map<String, String> getOutputParameters() { + return outputParameters; + } + + public void setOutputParameters(Map<String, String> outputParameters) { + this.outputParameters = outputParameters; + } /** * Constructs an item's Metadata. http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java index 230f9ff..565db13 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java @@ -21,6 +21,39 @@ package org.apache.hawq.pxf.api; /** - * PXF supported output formats: {@link #TEXT} and {@link #BINARY} + * PXF supported output formats: {@link org.apache.hawq.pxf.service.io.Text} and {@link org.apache.hawq.pxf.service.io.GPDBWritable} */ -public enum OutputFormat {TEXT, BINARY} +public enum OutputFormat { + TEXT("org.apache.hawq.pxf.service.io.Text"), + GPDBWritable("org.apache.hawq.pxf.service.io.GPDBWritable"); + + private String className; + + OutputFormat(String className) { + this.className = className; + } + + /** + * Returns a formats's implementation class name + * + * @return a formats's implementation class name + */ + public String getClassName() { + return className; + } + + /** + * Looks up output format for given class name if it exists. + * + * @throws UnsupportedTypeException if output format with given class wasn't found + * @return an output format with given class name + */ + public static OutputFormat getOutputFormat(String className) { + for (OutputFormat of : values()) { + if (of.getClassName().equals(className)) { + return of; + } + } + throw new UnsupportedTypeException("Unable to find output format by given class name: " + className); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java index 5afedca..9816fdc 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java @@ -31,6 +31,7 @@ import java.util.*; */ public class InputData { + public static final String DELIMITER_KEY = "DELIMITER"; public static final int INVALID_SPLIT_IDX = -1; private static final Log LOG = LogFactory.getLog(InputData.class); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java index 327a15b..9244ba2 100644 --- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java +++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/MetadataTest.java @@ -32,7 +32,7 @@ public class MetadataTest { @Test public void createFieldEmptyNameType() { try { - Metadata.Field field = new Metadata.Field(null, null, null, null); + Metadata.Field field = new Metadata.Field(null, null, false, null, null); fail("Empty name, type and source type shouldn't be allowed."); } catch (IllegalArgumentException e) { assertEquals("Field name, type and source type cannot be empty", e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java index ef9f76e..ea3accb 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java @@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.InputFormat; @@ -42,10 +43,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import static org.apache.hawq.pxf.api.io.DataType.*; -import static org.apache.hawq.pxf.api.io.DataType.BPCHAR; -import static org.apache.hawq.pxf.api.io.DataType.BYTEA; - /** * Accessor for Hive tables. The accessor will open and read a split belonging * to a Hive table. Opening a split means creating the corresponding InputFormat @@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor { */ private InputFormat<?, ?> createInputFormat(InputData input) throws Exception { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); - initPartitionFields(toks[3]); - filterInFragmenter = new Boolean(toks[4]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); return HiveDataFragmenter.makeInputFormat( - toks[0]/* inputFormat name */, jobConf); + hiveUserData.getInputFormatName()/* inputFormat name */, jobConf); } /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java index 362ac0d..7d85efe 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java @@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.BadRecordException; import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; +import org.apache.hawq.pxf.service.utilities.ProtocolData; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; - import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -57,11 +59,10 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR; */ public class HiveColumnarSerdeResolver extends HiveResolver { private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class); - private ColumnarSerDeBase deserializer; private boolean firstColumn; private StringBuilder builder; private StringBuilder parts; - private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private HiveUtilities.PXF_HIVE_SERDES serdeType; public HiveColumnarSerdeResolver(InputData input) throws Exception { super(input); @@ -70,24 +71,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver { /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */ @Override void parseUserData(InputData input) throws Exception { - String[] toks = HiveInputFormatFragmenter.parseToks(input); - String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE]; - if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE; - } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE; - } - else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); - } + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE); + String serdeClassName = hiveUserData.getSerdeClassName(); + + serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName); parts = new StringBuilder(); - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + partitionKeys = hiveUserData.getPartitionKeys(); parseDelimiterChar(input); } @Override void initPartitionFields() { - initPartitionFields(parts); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + initTextPartitionFields(parts); + } else { + super.initPartitionFields(); + } } /** @@ -97,15 +96,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver { */ @Override public List<OneField> getFields(OneRow onerow) throws Exception { - firstColumn = true; - builder = new StringBuilder(); - Object tuple = deserializer.deserialize((Writable) onerow.getData()); - ObjectInspector oi = deserializer.getObjectInspector(); - - traverseTuple(tuple, oi); - /* We follow Hive convention. Partition fields are always added at the end of the record */ - builder.append(parts); - return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString())); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + firstColumn = true; + builder = new StringBuilder(); + Object tuple = deserializer.deserialize((Writable) onerow.getData()); + ObjectInspector oi = deserializer.getObjectInspector(); + + traverseTuple(tuple, oi); + /* We follow Hive convention. Partition fields are always added at the end of the record */ + builder.append(parts); + return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString())); + } else { + return super.getFields(onerow); + } } /* @@ -138,14 +141,7 @@ public class HiveColumnarSerdeResolver extends HiveResolver { serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString()); serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); - if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) { - deserializer = new ColumnarSerDe(); - } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) { - deserializer = new LazyBinaryColumnarSerDe(); - } else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ - } - + deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE); deserializer.initialize(new JobConf(new Configuration(), HiveColumnarSerdeResolver.class), serdeProperties); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java index 2d2b53e..a03d3b7 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java @@ -59,7 +59,7 @@ import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.ProfilesConf; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; -import org.apache.hawq.pxf.service.ProfileFactory; +import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory; /** * Fragmenter class for HIVE tables. <br> @@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter { private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class); private static final short ALL_PARTS = -1; - public static final String HIVE_UD_DELIM = "!HUDD!"; public static final String HIVE_1_PART_DELIM = "!H1PD!"; public static final String HIVE_PARTITIONS_DELIM = "!HPAD!"; public static final String HIVE_NO_PART_TBL = "!HNPT!"; @@ -163,6 +162,10 @@ public class HiveDataFragmenter extends Fragmenter { Table tbl = HiveUtilities.getHiveTable(client, tblDesc); + Metadata metadata = new Metadata(tblDesc); + HiveUtilities.getSchema(tbl, metadata); + boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata); + verifySchema(tbl); List<Partition> partitions = null; @@ -228,7 +231,7 @@ public class HiveDataFragmenter extends Fragmenter { if (partitions.isEmpty()) { props = getSchema(tbl); - fetchMetaDataForSimpleTable(descTable, props); + fetchMetaDataForSimpleTable(descTable, props, hasComplexTypes); } else { List<FieldSchema> partitionKeys = tbl.getPartitionKeys(); @@ -239,7 +242,7 @@ public class HiveDataFragmenter extends Fragmenter { tblDesc.getPath(), tblDesc.getName(), partitionKeys); fetchMetaDataForPartitionedTable(descPartition, props, - partition, partitionKeys, tblDesc.getName()); + partition, partitionKeys, tblDesc.getName(), hasComplexTypes); } } } @@ -255,29 +258,30 @@ public class HiveDataFragmenter extends Fragmenter { } private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc, - Properties props) throws Exception { - fetchMetaDataForSimpleTable(stdsc, props, null); + Properties props, boolean hasComplexTypes) throws Exception { + fetchMetaDataForSimpleTable(stdsc, props, null, hasComplexTypes); } private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc, - Properties props, String tableName) + Properties props, String tableName, boolean hasComplexTypes) throws Exception { fetchMetaData(new HiveTablePartition(stdsc, props, null, null, - tableName)); + tableName), hasComplexTypes); } private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc, Properties props, Partition partition, List<FieldSchema> partitionKeys, - String tableName) + String tableName, + boolean hasComplexTypes) throws Exception { fetchMetaData(new HiveTablePartition(stdsc, props, partition, - partitionKeys, tableName)); + partitionKeys, tableName), hasComplexTypes); } /* Fills a table partition */ - private void fetchMetaData(HiveTablePartition tablePartition) + private void fetchMetaData(HiveTablePartition tablePartition, boolean hasComplexTypes) throws Exception { InputFormat<?, ?> fformat = makeInputFormat( tablePartition.storageDesc.getInputFormat(), jobConf); @@ -285,7 +289,7 @@ public class HiveDataFragmenter extends Fragmenter { if (inputData.getProfile() != null) { // evaluate optimal profile based on file format if profile was explicitly specified in url // if user passed accessor+fragmenter+resolver - use them - profile = ProfileFactory.get(fformat); + profile = ProfileFactory.get(fformat, hasComplexTypes); } String fragmenterForProfile = null; if (profile != null) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index ca4501b..9199118 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -55,11 +55,6 @@ import java.util.Properties; */ public class HiveInputFormatFragmenter extends HiveDataFragmenter { private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class); - private static final int EXPECTED_NUM_OF_TOKS = 4; - public static final int TOK_SERDE = 0; - public static final int TOK_KEYS = 1; - public static final int TOK_FILTER_DONE = 2; - public static final int TOK_COL_TYPES = 3; /** Defines the Hive input formats currently supported in pxf */ public enum PXF_HIVE_INPUT_FORMATS { @@ -68,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { ORC_FILE_INPUT_FORMAT } - /** Defines the Hive serializers (serde classes) currently supported in pxf */ - public enum PXF_HIVE_SERDES { - COLUMNAR_SERDE, - LAZY_BINARY_COLUMNAR_SERDE, - LAZY_SIMPLE_SERDE, - ORC_SERDE - } - /** * Constructs a HiveInputFormatFragmenter. * @@ -85,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { super(inputData, HiveInputFormatFragmenter.class); } - /** - * Extracts the user data: - * serde, partition keys and whether filter was included in fragmenter - * - * @param input input data from client - * @param supportedSerdes supported serde names - * @return parsed tokens - * @throws UserDataException if user data contains unsupported serde - * or wrong number of tokens - */ - static public String[] parseToks(InputData input, String... supportedSerdes) - throws UserDataException { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HIVE_UD_DELIM); - if (supportedSerdes.length > 0 - && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) { - throw new UserDataException(toks[TOK_SERDE] - + " serializer isn't supported by " + input.getAccessor()); - } - - if (toks.length != (EXPECTED_NUM_OF_TOKS)) { - throw new UserDataException("HiveInputFormatFragmenter expected " - + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length); - } - - return toks; - } - /* * Checks that hive fields and partitions match the HAWQ schema. Throws an * exception if: - the number of fields (+ partitions) do not match the HAWQ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java index ed4f908..66680bb 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java @@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.utilities.InputData; - +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.hadoop.mapred.*; import java.io.IOException; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table stored as Text files. @@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor { public HiveLineBreakAccessor(InputData input) throws Exception { super(input, new TextInputFormat()); ((TextInputFormat) inputFormat).configure(jobConf); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java index 91f91e7..dc76289 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java @@ -21,33 +21,49 @@ package org.apache.hawq.pxf.plugins.hive; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; - +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.MetadataFetcher; +import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.ProfilesConf; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.ProfileFactory; /** * Class for connecting to Hive's MetaStore and getting schema of Hive tables. */ public class HiveMetadataFetcher extends MetadataFetcher { + private static final String DELIM_FIELD = InputData.DELIMITER_KEY; + private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class); private HiveMetaStoreClient client; + private JobConf jobConf; public HiveMetadataFetcher(InputData md) { super(md); // init hive metastore client connection. client = HiveUtilities.initHiveClient(); + jobConf = new JobConf(new Configuration()); } /** @@ -82,8 +98,28 @@ public class HiveMetadataFetcher extends MetadataFetcher { try { Metadata metadata = new Metadata(tblDesc); Table tbl = HiveUtilities.getHiveTable(client, tblDesc); - getSchema(tbl, metadata); + HiveUtilities.getSchema(tbl, metadata); + boolean hasComplexTypes = HiveUtilities.hasComplexTypes(metadata); metadataList.add(metadata); + List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1); + Set<OutputFormat> formats = new HashSet<OutputFormat>(); + //If table has partitions - find out all formats + for (Partition tablePartition : tablePartitions) { + String inputFormat = tablePartition.getSd().getInputFormat(); + OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes); + formats.add(outputFormat); + } + //If table has no partitions - get single format of table + if (tablePartitions.size() == 0 ) { + String inputFormat = tbl.getSd().getInputFormat(); + OutputFormat outputFormat = getOutputFormat(inputFormat, hasComplexTypes); + formats.add(outputFormat); + } + metadata.setOutputFormats(formats); + Map<String, String> outputParameters = new HashMap<String, String>(); + Integer delimiterCode = HiveUtilities.getDelimiterCode(tbl.getSd()); + outputParameters.put(DELIM_FIELD, delimiterCode.toString()); + metadata.setOutputParameters(outputParameters); } catch (UnsupportedTypeException | UnsupportedOperationException e) { if(ignoreErrors) { LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage()); @@ -97,42 +133,13 @@ public class HiveMetadataFetcher extends MetadataFetcher { return metadataList; } - - /** - * Populates the given metadata object with the given table's fields and partitions, - * The partition fields are added at the end of the table schema. - * Throws an exception if the table contains unsupported field types. - * Supported HCatalog types: TINYINT, - * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP, - * DATE, DECIMAL, VARCHAR, CHAR. - * - * @param tbl Hive table - * @param metadata schema of given table - */ - private void getSchema(Table tbl, Metadata metadata) { - - int hiveColumnsSize = tbl.getSd().getColsSize(); - int hivePartitionsSize = tbl.getPartitionKeysSize(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions."); - } - - // check hive fields - try { - List<FieldSchema> hiveColumns = tbl.getSd().getCols(); - for (FieldSchema hiveCol : hiveColumns) { - metadata.addField(HiveUtilities.mapHiveType(hiveCol)); - } - // check partition fields - List<FieldSchema> hivePartitions = tbl.getPartitionKeys(); - for (FieldSchema hivePart : hivePartitions) { - metadata.addField(HiveUtilities.mapHiveType(hivePart)); - } - } catch (UnsupportedTypeException e) { - String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " + - e.getMessage(); - throw new UnsupportedTypeException(errorMsg); - } + private OutputFormat getOutputFormat(String inputFormat, boolean hasComplexTypes) throws Exception { + OutputFormat outputFormat = null; + InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf); + String profile = ProfileFactory.get(fformat, hasComplexTypes); + String outputFormatClassName = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT"); + outputFormat = OutputFormat.getOutputFormat(outputFormatClassName); + return outputFormat; } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index dc195f4..07348b0 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -30,6 +30,7 @@ import org.apache.hawq.pxf.api.BasicFilter; import org.apache.hawq.pxf.api.LogicalFilter; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.StringUtils; import java.sql.Date; @@ -37,7 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table that stores only ORC files. @@ -61,9 +62,9 @@ public class HiveORCAccessor extends HiveAccessor { */ public HiveORCAccessor(InputData input) throws Exception { super(input, new OrcInputFormat()); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java index 93aa474..fec0ff0 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java @@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; import java.util.*; @@ -43,8 +44,7 @@ import java.util.*; */ public class HiveORCSerdeResolver extends HiveResolver { private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class); - private OrcSerde deserializer; - private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private HiveUtilities.PXF_HIVE_SERDES serdeType; private String typesString; public HiveORCSerdeResolver(InputData input) throws Exception { @@ -54,41 +54,16 @@ public class HiveORCSerdeResolver extends HiveResolver { /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */ @Override void parseUserData(InputData input) throws Exception { - String[] toks = HiveInputFormatFragmenter.parseToks(input); - String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE]; - if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE; - } else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); - } - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; - typesString = toks[HiveInputFormatFragmenter.TOK_COL_TYPES]; + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE); + serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName()); + partitionKeys = hiveUserData.getPartitionKeys(); + typesString = hiveUserData.getColTypes(); collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM"); mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM : input.getUserProperty("MAPKEY_DELIM"); } - /** - * getFields returns a singleton list of OneField item. - * OneField item contains two fields: an integer representing the VARCHAR type and a Java - * Object representing the field value. - */ - @Override - public List<OneField> getFields(OneRow onerow) throws Exception { - - Object tuple = deserializer.deserialize((Writable) onerow.getData()); - // Each Hive record is a Struct - StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector(); - List<OneField> record = traverseStruct(tuple, soi, false); - - //Add partition fields if any - record.addAll(getPartitionFields()); - - return record; - - } - /* * Get and init the deserializer for the records of this Hive data fragment. * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated @@ -127,12 +102,7 @@ public class HiveORCSerdeResolver extends HiveResolver { serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString()); serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); - if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) { - deserializer = new OrcSerde(); - } else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ - } - + deserializer = HiveUtilities.createDeserializer(serdeType, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE); deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java index 2686851..7132d7b 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java @@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.utilities.InputData; - +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.mapred.FileSplit; @@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf; import java.io.IOException; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table that stores only RC files. @@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor { */ public HiveRCFileAccessor(InputData input) throws Exception { super(input, new RCFileInputFormat()); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java index 3837f78..5646969 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java @@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Plugin; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.CharUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,10 +75,10 @@ public class HiveResolver extends Plugin implements ReadResolver { protected static final String COLLECTION_DELIM = ","; protected String collectionDelim; protected String mapkeyDelim; - private SerDe deserializer; + protected SerDe deserializer; private List<OneField> partitionFields; - private String serdeName; - private String propsString; + protected String serdeClassName; + protected String propsString; String partitionKeys; protected char delimiter; String nullChar = "\\N"; @@ -133,19 +134,11 @@ public class HiveResolver extends Plugin implements ReadResolver { /* Parses user data string (arrived from fragmenter). */ void parseUserData(InputData input) throws Exception { - final int EXPECTED_NUM_OF_TOKS = 5; + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); - - if (toks.length != EXPECTED_NUM_OF_TOKS) { - throw new UserDataException("HiveResolver expected " - + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length); - } - - serdeName = toks[1]; - propsString = toks[2]; - partitionKeys = toks[3]; + serdeClassName = hiveUserData.getSerdeClassName(); + propsString = hiveUserData.getPropertiesString(); + partitionKeys = hiveUserData.getPartitionKeys(); collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM"); @@ -160,14 +153,16 @@ public class HiveResolver extends Plugin implements ReadResolver { void initSerde(InputData inputData) throws Exception { Properties serdeProperties; - Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader()); + Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader()); deserializer = (SerDe) c.newInstance(); serdeProperties = new Properties(); - ByteArrayInputStream inStream = new ByteArrayInputStream( - propsString.getBytes()); - serdeProperties.load(inStream); - deserializer.initialize(new JobConf(conf, HiveResolver.class), - serdeProperties); + if (propsString != null ) { + ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes()); + serdeProperties.load(inStream); + } else { + throw new IllegalArgumentException("propsString is mandatory to initialize serde."); + } + deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties); } /* @@ -271,7 +266,7 @@ public class HiveResolver extends Plugin implements ReadResolver { * The partition fields are initialized one time based on userData provided * by the fragmenter. */ - void initPartitionFields(StringBuilder parts) { + void initTextPartitionFields(StringBuilder parts) { if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) { return; } @@ -625,47 +620,49 @@ public class HiveResolver extends Plugin implements ReadResolver { */ void parseDelimiterChar(InputData input) { - String userDelim = input.getUserProperty("DELIMITER"); + String userDelim = input.getUserProperty(InputData.DELIMITER_KEY); if (userDelim == null) { - throw new IllegalArgumentException("DELIMITER is a required option"); - } - - final int VALID_LENGTH = 1; - final int VALID_LENGTH_HEX = 4; - - if (userDelim.startsWith("\\x")) { // hexadecimal sequence - - if (userDelim.length() != VALID_LENGTH_HEX) { + /* No DELIMITER in URL, try to get it from fragment's user data*/ + HiveUserData hiveUserData = null; + try { + hiveUserData = HiveUtilities.parseHiveUserData(input); + } catch (UserDataException ude) { + throw new IllegalArgumentException("Couldn't parse user data to get " + InputData.DELIMITER_KEY); + } + if (hiveUserData.getDelimiter() == null) { + throw new IllegalArgumentException(InputData.DELIMITER_KEY + " is a required option"); + } + delimiter = (char) Integer.valueOf(hiveUserData.getDelimiter()).intValue(); + } else { + final int VALID_LENGTH = 1; + final int VALID_LENGTH_HEX = 4; + if (userDelim.startsWith("\\x")) { // hexadecimal sequence + if (userDelim.length() != VALID_LENGTH_HEX) { + throw new IllegalArgumentException( + "Invalid hexdecimal value for delimiter (got" + + userDelim + ")"); + } + delimiter = (char) Integer.parseInt( + userDelim.substring(2, VALID_LENGTH_HEX), 16); + if (!CharUtils.isAscii(delimiter)) { + throw new IllegalArgumentException( + "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII " + + delimiter + ")"); + } + return; + } + if (userDelim.length() != VALID_LENGTH) { throw new IllegalArgumentException( - "Invalid hexdecimal value for delimiter (got" + "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got " + userDelim + ")"); } - - delimiter = (char) Integer.parseInt( - userDelim.substring(2, VALID_LENGTH_HEX), 16); - - if (!CharUtils.isAscii(delimiter)) { + if (!CharUtils.isAscii(userDelim.charAt(0))) { throw new IllegalArgumentException( "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII " - + delimiter + ")"); + + userDelim + ")"); } - - return; - } - - if (userDelim.length() != VALID_LENGTH) { - throw new IllegalArgumentException( - "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got " - + userDelim + ")"); + delimiter = userDelim.charAt(0); } - - if (!CharUtils.isAscii(userDelim.charAt(0))) { - throw new IllegalArgumentException( - "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII " - + userDelim + ")"); - } - - delimiter = userDelim.charAt(0); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java index fdc5f69..76d5cad 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java @@ -22,7 +22,11 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.UserDataException; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.service.utilities.ProtocolData; import java.util.Collections; import java.util.List; @@ -42,21 +46,32 @@ public class HiveStringPassResolver extends HiveResolver { @Override void parseUserData(InputData input) throws Exception { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); parseDelimiterChar(input); parts = new StringBuilder(); - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + partitionKeys = hiveUserData.getPartitionKeys(); + serdeClassName = hiveUserData.getSerdeClassName(); + + /* Needed only for GPDBWritable format*/ + if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) { + propsString = hiveUserData.getPropertiesString(); + } } @Override - void initSerde(InputData input) { - /* nothing to do here */ + void initSerde(InputData input) throws Exception { + if (((ProtocolData) inputData).outputFormat() == OutputFormat.GPDBWritable) { + super.initSerde(input); + } } @Override void initPartitionFields() { - initPartitionFields(parts); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + initTextPartitionFields(parts); + } else { + super.initPartitionFields(); + } } /** @@ -66,9 +81,13 @@ public class HiveStringPassResolver extends HiveResolver { */ @Override public List<OneField> getFields(OneRow onerow) throws Exception { - String line = (onerow.getData()).toString(); - - /* We follow Hive convention. Partition fields are always added at the end of the record */ - return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts)); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + String line = (onerow.getData()).toString(); + /* We follow Hive convention. Partition fields are always added at the end of the record */ + return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts)); + } else { + return super.getFields(onerow); + } } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java new file mode 100644 index 0000000..e3632e0 --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.plugins.hive; + +/** + * Class which is a carrier for user data in Hive fragment. + * + */ +public class HiveUserData { + + public static final String HIVE_UD_DELIM = "!HUDD!"; + private static final int EXPECTED_NUM_OF_TOKS = 7; + + public HiveUserData(String inputFormatName, String serdeClassName, + String propertiesString, String partitionKeys, + boolean filterInFragmenter, + String delimiter, + String colTypes) { + + this.inputFormatName = inputFormatName; + this.serdeClassName = serdeClassName; + this.propertiesString = propertiesString; + this.partitionKeys = partitionKeys; + this.filterInFragmenter = filterInFragmenter; + this.delimiter = (delimiter == null ? "0" : delimiter); + this.colTypes = colTypes; + } + + /** + * Returns input format of a fragment + * + * @return input format of a fragment + */ + public String getInputFormatName() { + return inputFormatName; + } + + /** + * Returns SerDe class name + * + * @return SerDe class name + */ + public String getSerdeClassName() { + return serdeClassName; + } + + /** + * Returns properties string needed for SerDe initialization + * + * @return properties string needed for SerDe initialization + */ + public String getPropertiesString() { + return propertiesString; + } + + /** + * Returns partition keys + * + * @return partition keys + */ + public String getPartitionKeys() { + return partitionKeys; + } + + /** + * Returns whether filtering was done in fragmenter + * + * @return true if filtering was done in fragmenter + */ + public boolean isFilterInFragmenter() { + return filterInFragmenter; + } + + /** + * Returns field delimiter + * + * @return field delimiter + */ + public String getDelimiter() { + return delimiter; + } + + public void setDelimiter(String delimiter) { + this.delimiter = delimiter; + } + + private String inputFormatName; + private String serdeClassName; + private String propertiesString; + private String partitionKeys; + private boolean filterInFragmenter; + private String delimiter; + private String colTypes; + + /** + * The method returns expected number of tokens in raw user data + * + * @return number of tokens in raw user data + */ + public static int getNumOfTokens() { + return EXPECTED_NUM_OF_TOKS; + } + + @Override + public String toString() { + return inputFormatName + HiveUserData.HIVE_UD_DELIM + + serdeClassName + HiveUserData.HIVE_UD_DELIM + + propertiesString + HiveUserData.HIVE_UD_DELIM + + partitionKeys + HiveUserData.HIVE_UD_DELIM + + filterInFragmenter + HiveUserData.HIVE_UD_DELIM + + delimiter + HiveUserData.HIVE_UD_DELIM + + colTypes; + } + + public String getColTypes() { + return colTypes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java index d91e949..ea65a66 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/EnumHiveToHawqType.java @@ -42,37 +42,48 @@ public enum EnumHiveToHawqType { FloatType("float", EnumHawqType.Float4Type), DoubleType("double", EnumHawqType.Float8Type), StringType("string", EnumHawqType.TextType), - BinaryType("binary", EnumHawqType.ByteaType), + BinaryType("binary", EnumHawqType.ByteaType, true), TimestampType("timestamp", EnumHawqType.TimestampType), DateType("date", EnumHawqType.DateType), DecimalType("decimal", EnumHawqType.NumericType, "[(,)]"), VarcharType("varchar", EnumHawqType.VarcharType, "[(,)]"), CharType("char", EnumHawqType.BpcharType, "[(,)]"), - ArrayType("array", EnumHawqType.TextType, "[<,>]"), - MapType("map", EnumHawqType.TextType, "[<,>]"), - StructType("struct", EnumHawqType.TextType, "[<,>]"), - UnionType("uniontype", EnumHawqType.TextType, "[<,>]"); + ArrayType("array", EnumHawqType.TextType, "[<,>]", true), + MapType("map", EnumHawqType.TextType, "[<,>]", true), + StructType("struct", EnumHawqType.TextType, "[<,>]", true), + UnionType("uniontype", EnumHawqType.TextType, "[<,>]", true); private String typeName; private EnumHawqType hawqType; private String splitExpression; private byte size; + private boolean isComplexType; EnumHiveToHawqType(String typeName, EnumHawqType hawqType) { this.typeName = typeName; this.hawqType = hawqType; } - + EnumHiveToHawqType(String typeName, EnumHawqType hawqType, byte size) { this(typeName, hawqType); this.size = size; } + EnumHiveToHawqType(String typeName, EnumHawqType hawqType, boolean isComplexType) { + this(typeName, hawqType); + this.isComplexType = isComplexType; + } + EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression) { this(typeName, hawqType); this.splitExpression = splitExpression; } + EnumHiveToHawqType(String typeName, EnumHawqType hawqType, String splitExpression, boolean isComplexType) { + this(typeName, hawqType, splitExpression); + this.isComplexType = isComplexType; + } + /** * * @return name of type @@ -216,4 +227,12 @@ public enum EnumHiveToHawqType { return size; } + public boolean isComplexType() { + return isComplexType; + } + + public void setComplexType(boolean isComplexType) { + this.isComplexType = isComplexType; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index f7ebf4d..37f4ac2 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -35,17 +35,28 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.*; import org.apache.hawq.pxf.api.Fragmenter; import org.apache.hawq.pxf.api.Metadata; +import org.apache.hawq.pxf.api.Metadata.Field; import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.UserDataException; import org.apache.hawq.pxf.api.utilities.EnumHawqType; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveTablePartition; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS; -import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import org.apache.hawq.pxf.plugins.hive.HiveUserData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Class containing helper functions connecting @@ -53,6 +64,46 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE */ public class HiveUtilities { + /** Defines the Hive serializers (serde classes) currently supported in pxf */ + public enum PXF_HIVE_SERDES { + COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"), + LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"), + LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), + ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde"); + + private String serdeClassName; + + PXF_HIVE_SERDES(String serdeClassName) { + this.serdeClassName = serdeClassName; + } + + /** + * Method which looks up serde by serde class name. + * + * @param serdeClassName input serde name + * @param allowedSerdes all serdes which allowed in current context + * @return serde by given serde class name and list of allowed serdes + * @throws UnsupportedTypeException if unable to find serde by class name, or found serde which is not allowed in current context + */ + public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) { + for (PXF_HIVE_SERDES s : values()) { + if (s.getSerdeClassName().equals(serdeClassName)) { + + if (allowedSerdes.length > 0 + && !Arrays.asList(allowedSerdes).contains(s)) { + throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName); + } + return s; + } + } + throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName); + } + + public String getSerdeClassName() { + return serdeClassName; + } + } + private static final Log LOG = LogFactory.getLog(HiveUtilities.class); private static final String WILDCARD = "*"; @@ -64,10 +115,7 @@ public class HiveUtilities { static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; - static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; - static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; - static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + private static final int DEFAULT_DELIMITER_CODE = 44; /** * Initializes the HiveMetaStoreClient @@ -162,7 +210,7 @@ public class HiveUtilities { } else hiveTypeName = hiveType; - return new Metadata.Field(fieldName, hawqType, hiveTypeName, modifiers); + return new Metadata.Field(fieldName, hawqType, hiveToHawqType.isComplexType(), hiveTypeName, modifiers); } /** @@ -376,31 +424,6 @@ public class HiveUtilities { } } - /* - * Validates that partition serde corresponds to PXF supported serdes and - * transforms the class name to an enumeration for writing it to the - * resolvers on other PXF instances. - */ - private static String assertSerde(String className, HiveTablePartition partData) - throws Exception { - switch (className) { - case STR_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.COLUMNAR_SERDE.name(); - case STR_LAZY_BINARY_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(); - case STR_LAZY_SIMPLE_SERDE: - return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name(); - case STR_ORC_SERDE: - return PXF_HIVE_SERDES.ORC_SERDE.name(); - default: - throw new UnsupportedTypeException( - "HiveInputFormatFragmenter does not yet support " - + className + " for " + partData - + ". Supported serializers are: " - + Arrays.toString(PXF_HIVE_SERDES.values())); - } - } - /* Turns the partition keys into a string */ public static String serializePartitionKeys(HiveTablePartition partData) throws Exception { @@ -429,10 +452,19 @@ public class HiveUtilities { return partitionKeys.toString(); } + /** + * The method which serializes fragment-related attributes, needed for reading and resolution to string + * + * @param fragmenterClassName + * @param partData + * @param filterInFragmenter + * @return serialized representation of fragment-related attributes + * @throws Exception + */ @SuppressWarnings("unchecked") public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception { - String userData = null; + HiveUserData hiveUserData = null; if (fragmenterClassName == null) { throw new IllegalArgumentException("No fragmenter provided."); @@ -440,25 +472,158 @@ public class HiveUtilities { Class fragmenterClass = Class.forName(fragmenterClassName); + String inputFormatName = partData.storageDesc.getInputFormat(); + String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib(); + String propertiesString = serializeProperties(partData.properties); + String partitionKeys = serializePartitionKeys(partData); + String delimiter = getDelimiterCode(partData.storageDesc).toString(); + String colTypes = partData.properties.getProperty("columns.types"); + if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) { - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String partitionKeys = serializePartitionKeys(partData); - String colTypes = partData.properties.getProperty("columns.types"); assertFileType(inputFormatName, partData); - userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM - + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter + HiveDataFragmenter.HIVE_UD_DELIM + colTypes; - } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){ - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String propertiesString = serializeProperties(partData.properties); - String partitionKeys = serializePartitionKeys(partData); - userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName - + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM - + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; - } else { - throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName); } - return userData.getBytes(); + + hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter, delimiter, colTypes); + + return hiveUserData.toString().getBytes(); + } + + /** + * The method parses raw user data into HiveUserData class + * + * @param input input data + * @param supportedSerdes list of allowed serdes in current context + * @return instance of HiveUserData class + * @throws UserDataException + */ + public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{ + String userData = new String(input.getFragmentUserData()); + String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM, HiveUserData.getNumOfTokens()); + + if (toks.length != (HiveUserData.getNumOfTokens())) { + throw new UserDataException("HiveInputFormatFragmenter expected " + + HiveUserData.getNumOfTokens() + " tokens, but got " + toks.length); + } + + HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]), toks[5], toks[6]); + + if (supportedSerdes.length > 0) { + /* Make sure this serde is supported */ + PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes); + } + + return hiveUserData; + } + + private static String getSerdeParameter(StorageDescriptor sd, String parameterKey) { + String parameterValue = null; + if (sd != null && sd.getSerdeInfo() != null && sd.getSerdeInfo().getParameters() != null && sd.getSerdeInfo().getParameters().get(parameterKey) != null) { + parameterValue = sd.getSerdeInfo().getParameters().get(parameterKey); + } + + return parameterValue; + } + + /** + * The method which extracts field delimiter from storage descriptor. + * When unable to extract delimiter from storage descriptor, default value is used + * + * @param sd StorageDescriptor of table/partition + * @return ASCII code of delimiter + */ + public static Integer getDelimiterCode(StorageDescriptor sd) { + Integer delimiterCode = null; + + String delimiter = getSerdeParameter(sd, serdeConstants.FIELD_DELIM); + if (delimiter != null) { + delimiterCode = (int) delimiter.charAt(0); + return delimiterCode; + } + + delimiter = getSerdeParameter(sd, serdeConstants.SERIALIZATION_FORMAT); + if (delimiter != null) { + delimiterCode = Integer.parseInt(delimiter); + return delimiterCode; + } + + return DEFAULT_DELIMITER_CODE; + } + + /** + * The method determines whether metadata definition has any complex type + * @see EnumHiveToHawqType for complex type attribute definition + * + * @param metadata metadata of relation + * @return true if metadata has at least one field of complex type + */ + public static boolean hasComplexTypes(Metadata metadata) { + boolean hasComplexTypes = false; + List<Field> fields = metadata.getFields(); + for (Field field: fields) { + if (field.isComplexType()) { + hasComplexTypes = true; + break; + } + } + + return hasComplexTypes; + } + + /** + * Populates the given metadata object with the given table's fields and partitions, + * The partition fields are added at the end of the table schema. + * Throws an exception if the table contains unsupported field types. + * Supported HCatalog types: TINYINT, + * SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP, + * DATE, DECIMAL, VARCHAR, CHAR. + * + * @param tbl Hive table + * @param metadata schema of given table + */ + public static void getSchema(Table tbl, Metadata metadata) { + + int hiveColumnsSize = tbl.getSd().getColsSize(); + int hivePartitionsSize = tbl.getPartitionKeysSize(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Hive table: " + hiveColumnsSize + " fields, " + hivePartitionsSize + " partitions."); + } + + // check hive fields + try { + List<FieldSchema> hiveColumns = tbl.getSd().getCols(); + for (FieldSchema hiveCol : hiveColumns) { + metadata.addField(HiveUtilities.mapHiveType(hiveCol)); + } + // check partition fields + List<FieldSchema> hivePartitions = tbl.getPartitionKeys(); + for (FieldSchema hivePart : hivePartitions) { + metadata.addField(HiveUtilities.mapHiveType(hivePart)); + } + } catch (UnsupportedTypeException e) { + String errorMsg = "Failed to retrieve metadata for table " + metadata.getItem() + ". " + + e.getMessage(); + throw new UnsupportedTypeException(errorMsg); + } + } + + /** + * Creates an instance of a given serde type + * + * @param serdeType + * @param allowedSerdes + * @return instance of a given serde + * @throws UnsupportedTypeException if given serde is not allowed in current context + */ + @SuppressWarnings("deprecation") + public static SerDe createDeserializer(PXF_HIVE_SERDES serdeType, PXF_HIVE_SERDES... allowedSerdes) throws Exception{ + SerDe deserializer = null; + if (!Arrays.asList(allowedSerdes).contains(serdeType)) { + throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); + } + + deserializer = (SerDe) Utilities.createAnyInstance(serdeType.getSerdeClassName()); + + return deserializer; } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java new file mode 100644 index 0000000..f36f074 --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java @@ -0,0 +1,61 @@ +package org.apache.hawq.pxf.plugins.hive.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hawq.pxf.api.Metadata; + +/** + * Factory class which returns optimal profile for given input format + * + */ +public class ProfileFactory { + + private static final String HIVE_TEXT_PROFILE = "HiveText"; + private static final String HIVE_RC_PROFILE = "HiveRC"; + private static final String HIVE_ORC_PROFILE = "HiveORC"; + private static final String HIVE_PROFILE = "Hive"; + + /** + * The method which returns optimal profile + * + * @param inputFormat input format of table/partition + * @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType + * @return name of optimal profile + */ + public static String get(InputFormat inputFormat, boolean hasComplexTypes) { + String profileName = null; + if (inputFormat instanceof TextInputFormat && !hasComplexTypes) { + profileName = HIVE_TEXT_PROFILE; + } else if (inputFormat instanceof RCFileInputFormat) { + profileName = HIVE_RC_PROFILE; + } else if (inputFormat instanceof OrcInputFormat) { + profileName = HIVE_ORC_PROFILE; + } else { + //Default case + profileName = HIVE_PROFILE; + } + return profileName; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java index d9d97fc..6e40f9a 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcherTest.java @@ -132,6 +132,7 @@ public class HiveMetadataFetcherTest { fields.add(new FieldSchema("field2", "int", null)); StorageDescriptor sd = new StorageDescriptor(); sd.setCols(fields); + sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat"); Table hiveTable = new Table(); hiveTable.setTableType("MANAGED_TABLE"); hiveTable.setSd(sd); @@ -176,6 +177,7 @@ public class HiveMetadataFetcherTest { fields.add(new FieldSchema("field2", "int", null)); StorageDescriptor sd = new StorageDescriptor(); sd.setCols(fields); + sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat"); // Mock hive tables returned from hive client for(int index=1;index<=2;index++) { @@ -235,6 +237,7 @@ public class HiveMetadataFetcherTest { fields.add(new FieldSchema("field2", "int", null)); StorageDescriptor sd = new StorageDescriptor(); sd.setCols(fields); + sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat"); Table hiveTable2 = new Table(); hiveTable2.setTableType("MANAGED_TABLE"); hiveTable2.setSd(sd); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6fa1ced2/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java index 7bbe811..8b4bf13 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java @@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,7 +45,7 @@ import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class}) +@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class}) @SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf", "org.apache.hadoop.hive.metastore.api.MetaException", "org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits @@ -61,8 +63,9 @@ public class HiveORCAccessorTest { jobConf = new JobConf(); PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf); - PowerMockito.mockStatic(HiveInputFormatFragmenter.class); - PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"}); + PowerMockito.mockStatic(HiveUtilities.class); + PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true, "1", "")); + PowerMockito.mockStatic(HdfsUtilities.class); PowerMockito.mockStatic(HiveDataFragmenter.class);
