Repository: incubator-hawq Updated Branches: refs/heads/HAWQ-1228 [created] 607184c36
HAWQ-1228. Initial commit. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0574e75f Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0574e75f Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0574e75f Branch: refs/heads/HAWQ-1228 Commit: 0574e75fa972f6ccddd1f55a98972223b3860759 Parents: 25c87ec Author: Oleksandr Diachenko <[email protected]> Authored: Wed Dec 28 14:03:50 2016 -0800 Committer: Oleksandr Diachenko <[email protected]> Committed: Fri Jan 6 11:55:19 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/hawq/pxf/api/Metadata.java | 12 ++ .../org/apache/hawq/pxf/api/OutputFormat.java | 2 +- .../hawq/pxf/plugins/hive/HiveAccessor.java | 14 +-- .../plugins/hive/HiveColumnarSerdeResolver.java | 62 +++++----- .../pxf/plugins/hive/HiveDataFragmenter.java | 1 - .../plugins/hive/HiveInputFormatFragmenter.java | 40 ------- .../pxf/plugins/hive/HiveLineBreakAccessor.java | 10 +- .../pxf/plugins/hive/HiveMetadataFetcher.java | 41 ++++++- .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 11 +- .../pxf/plugins/hive/HiveORCSerdeResolver.java | 17 ++- .../pxf/plugins/hive/HiveRCFileAccessor.java | 10 +- .../hawq/pxf/plugins/hive/HiveResolver.java | 38 +++---- .../plugins/hive/HiveStringPassResolver.java | 42 +++++-- .../hawq/pxf/plugins/hive/HiveUserData.java | 71 ++++++++++++ .../plugins/hive/utilities/HiveUtilities.java | 113 +++++++++++-------- .../pxf/plugins/hive/HiveORCAccessorTest.java | 9 +- .../apache/hawq/pxf/service/ProfileFactory.java | 20 ++-- .../src/main/resources/pxf-profiles-default.xml | 4 + src/backend/access/external/fileam.c | 4 + src/backend/catalog/external/externalmd.c | 57 ++++++++-- src/bin/gpfusion/gpbridgeapi.c | 43 ++++++- src/include/access/extprotocol.h | 1 + src/include/access/fileam.h | 1 + src/include/catalog/external/itemmd.h | 3 + src/include/catalog/pg_exttable.h | 10 +- 25 files changed, 428 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java index 9e1c137..7e3b92e 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java @@ -22,6 +22,7 @@ package org.apache.hawq.pxf.api; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.commons.lang.StringUtils; @@ -124,6 +125,17 @@ public class Metadata { */ private List<Metadata.Field> fields; + + private Set<OutputFormat> outputFormats; + + public Set<OutputFormat> getOutputFormats() { + return outputFormats; + } + + public void setFormats(Set<OutputFormat> outputFormats) { + this.outputFormats = outputFormats; + } + /** * Constructs an item's Metadata. * http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java index 230f9ff..82a747f 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java @@ -23,4 +23,4 @@ package org.apache.hawq.pxf.api; /** * PXF supported output formats: {@link #TEXT} and {@link #BINARY} */ -public enum OutputFormat {TEXT, BINARY} +public enum OutputFormat {TEXT, BINARY, UNKNOWN} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java index ef9f76e..ea3accb 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java @@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.InputFormat; @@ -42,10 +43,6 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import static org.apache.hawq.pxf.api.io.DataType.*; -import static org.apache.hawq.pxf.api.io.DataType.BPCHAR; -import static org.apache.hawq.pxf.api.io.DataType.BYTEA; - /** * Accessor for Hive tables. The accessor will open and read a split belonging * to a Hive table. Opening a split means creating the corresponding InputFormat @@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor { */ private InputFormat<?, ?> createInputFormat(InputData input) throws Exception { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); - initPartitionFields(toks[3]); - filterInFragmenter = new Boolean(toks[4]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); return HiveDataFragmenter.makeInputFormat( - toks[0]/* inputFormat name */, jobConf); + hiveUserData.getInputFormatName()/* inputFormat name */, jobConf); } /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java index 362ac0d..2bf39ff 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java @@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.BadRecordException; import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; +import org.apache.hawq.pxf.service.utilities.ProtocolData; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; - import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -57,11 +59,11 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR; */ public class HiveColumnarSerdeResolver extends HiveResolver { private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class); - private ColumnarSerDeBase deserializer; + //private ColumnarSerDeBase deserializer; private boolean firstColumn; private StringBuilder builder; private StringBuilder parts; - private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private HiveUtilities.PXF_HIVE_SERDES serdeType; public HiveColumnarSerdeResolver(InputData input) throws Exception { super(input); @@ -70,24 +72,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver { /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */ @Override void parseUserData(InputData input) throws Exception { - String[] toks = HiveInputFormatFragmenter.parseToks(input); - String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE]; - if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE; - } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE; - } - else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); - } + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE); + String serdeClassName = hiveUserData.getSerdeClassName(); + + serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName); parts = new StringBuilder(); - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + partitionKeys = hiveUserData.getPartitionKeys(); parseDelimiterChar(input); } @Override void initPartitionFields() { - initPartitionFields(parts); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + initTextPartitionFields(parts); + } else { + super.initPartitionFields(); + } } /** @@ -97,15 +97,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver { */ @Override public List<OneField> getFields(OneRow onerow) throws Exception { - firstColumn = true; - builder = new StringBuilder(); - Object tuple = deserializer.deserialize((Writable) onerow.getData()); - ObjectInspector oi = deserializer.getObjectInspector(); - - traverseTuple(tuple, oi); - /* We follow Hive convention. Partition fields are always added at the end of the record */ - builder.append(parts); - return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString())); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + firstColumn = true; + builder = new StringBuilder(); + Object tuple = deserializer.deserialize((Writable) onerow.getData()); + ObjectInspector oi = deserializer.getObjectInspector(); + + traverseTuple(tuple, oi); + /* We follow Hive convention. Partition fields are always added at the end of the record */ + builder.append(parts); + return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString())); + } else { + return super.getFields(onerow); + } } /* @@ -138,9 +142,10 @@ public class HiveColumnarSerdeResolver extends HiveResolver { serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString()); serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); - if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) { + //TODO: Move this logic to utilities + if (serdeType == HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE) { deserializer = new ColumnarSerDe(); - } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) { + } else if (serdeType == HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) { deserializer = new LazyBinaryColumnarSerDe(); } else { throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ @@ -233,4 +238,9 @@ public class HiveColumnarSerdeResolver extends HiveResolver { } firstColumn = false; } + + @Override + void parseDelimiterChar(InputData input) { + delimiter = 1; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java index 2d2b53e..97f278d 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java @@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter { private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class); private static final short ALL_PARTS = -1; - public static final String HIVE_UD_DELIM = "!HUDD!"; public static final String HIVE_1_PART_DELIM = "!H1PD!"; public static final String HIVE_PARTITIONS_DELIM = "!HPAD!"; public static final String HIVE_NO_PART_TBL = "!HNPT!"; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index 051a246..4449d8f 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -55,10 +55,6 @@ import java.util.List; */ public class HiveInputFormatFragmenter extends HiveDataFragmenter { private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class); - private static final int EXPECTED_NUM_OF_TOKS = 3; - public static final int TOK_SERDE = 0; - public static final int TOK_KEYS = 1; - public static final int TOK_FILTER_DONE = 2; /** Defines the Hive input formats currently supported in pxf */ public enum PXF_HIVE_INPUT_FORMATS { @@ -67,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { ORC_FILE_INPUT_FORMAT } - /** Defines the Hive serializers (serde classes) currently supported in pxf */ - public enum PXF_HIVE_SERDES { - COLUMNAR_SERDE, - LAZY_BINARY_COLUMNAR_SERDE, - LAZY_SIMPLE_SERDE, - ORC_SERDE - } - /** * Constructs a HiveInputFormatFragmenter. * @@ -84,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { super(inputData, HiveInputFormatFragmenter.class); } - /** - * Extracts the user data: - * serde, partition keys and whether filter was included in fragmenter - * - * @param input input data from client - * @param supportedSerdes supported serde names - * @return parsed tokens - * @throws UserDataException if user data contains unsupported serde - * or wrong number of tokens - */ - static public String[] parseToks(InputData input, String... supportedSerdes) - throws UserDataException { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HIVE_UD_DELIM); - if (supportedSerdes.length > 0 - && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) { - throw new UserDataException(toks[TOK_SERDE] - + " serializer isn't supported by " + input.getAccessor()); - } - - if (toks.length != (EXPECTED_NUM_OF_TOKS)) { - throw new UserDataException("HiveInputFormatFragmenter expected " - + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length); - } - - return toks; - } - /* * Checks that hive fields and partitions match the HAWQ schema. Throws an * exception if: - the number of fields (+ partitions) do not match the HAWQ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java index ed4f908..66680bb 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java @@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.utilities.InputData; - +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.hadoop.mapred.*; import java.io.IOException; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table stored as Text files. @@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor { public HiveLineBreakAccessor(InputData input) throws Exception { super(input, new TextInputFormat()); ((TextInputFormat) inputFormat).configure(jobConf); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java index 91f91e7..3d04bc1 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java @@ -21,19 +21,27 @@ package org.apache.hawq.pxf.plugins.hive; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; - +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.MetadataFetcher; +import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.ProfilesConf; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.service.ProfileFactory; /** * Class for connecting to Hive's MetaStore and getting schema of Hive tables. @@ -42,12 +50,14 @@ public class HiveMetadataFetcher extends MetadataFetcher { private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class); private HiveMetaStoreClient client; + private JobConf jobConf; public HiveMetadataFetcher(InputData md) { super(md); // init hive metastore client connection. client = HiveUtilities.initHiveClient(); + jobConf = new JobConf(new Configuration()); } /** @@ -84,6 +94,21 @@ public class HiveMetadataFetcher extends MetadataFetcher { Table tbl = HiveUtilities.getHiveTable(client, tblDesc); getSchema(tbl, metadata); metadataList.add(metadata); + List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1); + Set<OutputFormat> formats = new HashSet<OutputFormat>(); + //If table has partitions - find out all formats + for (Partition tablePartition : tablePartitions) { + String inputFormat = tablePartition.getSd().getInputFormat(); + OutputFormat outputFormat = getOutputFormat(inputFormat); + formats.add(outputFormat); + } + //If table has no partitions - get single format of table + if (tablePartitions.size() == 0 ) { + String inputFormat = tbl.getSd().getInputFormat(); + OutputFormat outputFormat = getOutputFormat(inputFormat); + formats.add(outputFormat); + } + metadata.setFormats(formats); } catch (UnsupportedTypeException | UnsupportedOperationException e) { if(ignoreErrors) { LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage()); @@ -135,4 +160,18 @@ public class HiveMetadataFetcher extends MetadataFetcher { throw new UnsupportedTypeException(errorMsg); } } + + private OutputFormat getOutputFormat(String inputFormat) { + OutputFormat outputFormat = OutputFormat.UNKNOWN; + try { + InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf); + String profile = ProfileFactory.get(fformat); + String outputFormatString = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT"); + outputFormat = OutputFormat.valueOf(outputFormatString); + } catch (Exception e) { + LOG.warn("Unable to get output format for input format: " + inputFormat); + } + return outputFormat; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index dc195f4..be29eec 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -30,6 +30,9 @@ import org.apache.hawq.pxf.api.BasicFilter; import org.apache.hawq.pxf.api.LogicalFilter; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; import org.apache.commons.lang.StringUtils; import java.sql.Date; @@ -37,7 +40,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table that stores only ORC files. @@ -61,9 +64,9 @@ public class HiveORCAccessor extends HiveAccessor { */ public HiveORCAccessor(InputData input) throws Exception { super(input, new OrcInputFormat()); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java index 7673713..381c407 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java @@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; import java.util.*; @@ -44,7 +45,7 @@ import java.util.*; public class HiveORCSerdeResolver extends HiveResolver { private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class); private OrcSerde deserializer; - private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType; + private HiveUtilities.PXF_HIVE_SERDES serdeType; public HiveORCSerdeResolver(InputData input) throws Exception { super(input); @@ -53,14 +54,9 @@ public class HiveORCSerdeResolver extends HiveResolver { /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */ @Override void parseUserData(InputData input) throws Exception { - String[] toks = HiveInputFormatFragmenter.parseToks(input); - String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE]; - if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) { - serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE; - } else { - throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr); - } - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE); + serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName()); + partitionKeys = hiveUserData.getPartitionKeys(); collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM"); mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM @@ -72,6 +68,7 @@ public class HiveORCSerdeResolver extends HiveResolver { * OneField item contains two fields: an integer representing the VARCHAR type and a Java * Object representing the field value. */ + //TODO: It's the same as in parent class @Override public List<OneField> getFields(OneRow onerow) throws Exception { @@ -117,7 +114,7 @@ public class HiveORCSerdeResolver extends HiveResolver { serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString()); serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString()); - if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) { + if (serdeType == HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE) { deserializer = new OrcSerde(); } else { throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java index 2686851..7132d7b 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java @@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.utilities.InputData; - +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.mapred.FileSplit; @@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf; import java.io.IOException; -import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Specialization of HiveAccessor for a Hive table that stores only RC files. @@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor { */ public HiveRCFileAccessor(InputData input) throws Exception { super(input, new RCFileInputFormat()); - String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name()); - initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]); - filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE); + initPartitionFields(hiveUserData.getPartitionKeys()); + filterInFragmenter = hiveUserData.isFilterInFragmenter(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java index 3837f78..55d7205 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java @@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Plugin; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.CharUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,10 +75,13 @@ public class HiveResolver extends Plugin implements ReadResolver { protected static final String COLLECTION_DELIM = ","; protected String collectionDelim; protected String mapkeyDelim; - private SerDe deserializer; + //private SerDe deserializer; + protected SerDe deserializer; private List<OneField> partitionFields; - private String serdeName; - private String propsString; + //private String serdeClassName; + protected String serdeClassName; + //private String propsString; + protected String propsString; String partitionKeys; protected char delimiter; String nullChar = "\\N"; @@ -135,17 +139,11 @@ public class HiveResolver extends Plugin implements ReadResolver { void parseUserData(InputData input) throws Exception { final int EXPECTED_NUM_OF_TOKS = 5; - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); - if (toks.length != EXPECTED_NUM_OF_TOKS) { - throw new UserDataException("HiveResolver expected " - + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length); - } - - serdeName = toks[1]; - propsString = toks[2]; - partitionKeys = toks[3]; + serdeClassName = hiveUserData.getSerdeClassName(); + propsString = hiveUserData.getPropertiesString(); + partitionKeys = hiveUserData.getPartitionKeys(); collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM : input.getUserProperty("COLLECTION_DELIM"); @@ -160,14 +158,14 @@ public class HiveResolver extends Plugin implements ReadResolver { void initSerde(InputData inputData) throws Exception { Properties serdeProperties; - Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader()); + Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader()); deserializer = (SerDe) c.newInstance(); serdeProperties = new Properties(); - ByteArrayInputStream inStream = new ByteArrayInputStream( - propsString.getBytes()); - serdeProperties.load(inStream); - deserializer.initialize(new JobConf(conf, HiveResolver.class), - serdeProperties); + if (propsString != null ) { + ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes()); + serdeProperties.load(inStream); + } + deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties); } /* @@ -271,7 +269,7 @@ public class HiveResolver extends Plugin implements ReadResolver { * The partition fields are initialized one time based on userData provided * by the fragmenter. */ - void initPartitionFields(StringBuilder parts) { + void initTextPartitionFields(StringBuilder parts) { if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) { return; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java index fdc5f69..c7cfb36 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java @@ -22,7 +22,10 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.hawq.pxf.api.OneField; import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.service.utilities.ProtocolData; import java.util.Collections; import java.util.List; @@ -42,21 +45,34 @@ public class HiveStringPassResolver extends HiveResolver { @Override void parseUserData(InputData input) throws Exception { - String userData = new String(input.getFragmentUserData()); - String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM); + HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input); parseDelimiterChar(input); parts = new StringBuilder(); - partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS]; + partitionKeys = hiveUserData.getPartitionKeys(); + serdeClassName = hiveUserData.getSerdeClassName(); + + /* Needed only for BINARY format*/ + if (((ProtocolData) inputData).outputFormat() == OutputFormat.BINARY) { + propsString = hiveUserData.getPropertiesString(); + } } @Override - void initSerde(InputData input) { - /* nothing to do here */ + void initSerde(InputData input) throws Exception { + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + /* nothing to do here */ + } else { + super.initSerde(input); + } } @Override void initPartitionFields() { - initPartitionFields(parts); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + initTextPartitionFields(parts); + } else { + super.initPartitionFields(); + } } /** @@ -66,9 +82,17 @@ public class HiveStringPassResolver extends HiveResolver { */ @Override public List<OneField> getFields(OneRow onerow) throws Exception { - String line = (onerow.getData()).toString(); + if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) { + String line = (onerow.getData()).toString(); + + /* We follow Hive convention. Partition fields are always added at the end of the record */ + return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts)); + } else { + return super.getFields(onerow); + } + } - /* We follow Hive convention. Partition fields are always added at the end of the record */ - return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts)); + void parseDelimiterChar(InputData input) { + this.delimiter = 1; } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java new file mode 100644 index 0000000..710700a --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.plugins.hive; + +public class HiveUserData { + + public static final String HIVE_UD_DELIM = "!HUDD!"; + + public HiveUserData(String inputFormatName, String serdeClassName, + String propertiesString, String partitionKeys, + boolean filterInFragmenter) { + this.inputFormatName = inputFormatName; + this.serdeClassName = serdeClassName; + this.propertiesString = propertiesString; + this.partitionKeys = partitionKeys; + this.filterInFragmenter = filterInFragmenter; + } + + public String getInputFormatName() { + return inputFormatName; + } + + public String getSerdeClassName() { + return serdeClassName; + } + + public String getPropertiesString() { + return propertiesString; + } + + public String getPartitionKeys() { + return partitionKeys; + } + + public boolean isFilterInFragmenter() { + return filterInFragmenter; + } + + private String inputFormatName; + private String serdeClassName; + private String propertiesString; + private String partitionKeys; + private boolean filterInFragmenter; + + @Override + public String toString() { + return inputFormatName + HiveUserData.HIVE_UD_DELIM + + serdeClassName + HiveUserData.HIVE_UD_DELIM + + propertiesString + HiveUserData.HIVE_UD_DELIM + + partitionKeys + HiveUserData.HIVE_UD_DELIM + + filterInFragmenter; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index ffd66b8..b78c379 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -39,13 +39,16 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hawq.pxf.api.Fragmenter; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.UnsupportedTypeException; +import org.apache.hawq.pxf.api.UserDataException; import org.apache.hawq.pxf.api.utilities.EnumHawqType; +import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter; import org.apache.hawq.pxf.plugins.hive.HiveTablePartition; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS; -import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; +import org.apache.hawq.pxf.plugins.hive.HiveUserData; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; /** * Class containing helper functions connecting @@ -53,6 +56,38 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE */ public class HiveUtilities { + /** Defines the Hive serializers (serde classes) currently supported in pxf */ + public enum PXF_HIVE_SERDES { + COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"), + LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"), + LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), + ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde"); + + private String serdeClassName; + + PXF_HIVE_SERDES(String serdeClassName) { + this.serdeClassName = serdeClassName; + } + + public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) { + for (PXF_HIVE_SERDES s : values()) { + if (s.getSerdeClassName().equals(serdeClassName)) { + + if (allowedSerdes.length > 0 + && !Arrays.asList(allowedSerdes).contains(s)) { + throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName); + } + return s; + } + } + throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName); + } + + public String getSerdeClassName() { + return serdeClassName; + } + } + private static final Log LOG = LogFactory.getLog(HiveUtilities.class); private static final String WILDCARD = "*"; @@ -64,10 +99,7 @@ public class HiveUtilities { static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; - static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; - static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; - static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + private static final int EXPECTED_NUM_OF_TOKS = 5; /** * Initializes the HiveMetaStoreClient @@ -376,31 +408,6 @@ public class HiveUtilities { } } - /* - * Validates that partition serde corresponds to PXF supported serdes and - * transforms the class name to an enumeration for writing it to the - * resolvers on other PXF instances. - */ - private static String assertSerde(String className, HiveTablePartition partData) - throws Exception { - switch (className) { - case STR_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.COLUMNAR_SERDE.name(); - case STR_LAZY_BINARY_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(); - case STR_LAZY_SIMPLE_SERDE: - return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name(); - case STR_ORC_SERDE: - return PXF_HIVE_SERDES.ORC_SERDE.name(); - default: - throw new UnsupportedTypeException( - "HiveInputFormatFragmenter does not yet support " - + className + " for " + partData - + ". Supported serializers are: " - + Arrays.toString(PXF_HIVE_SERDES.values())); - } - } - /* Turns the partition keys into a string */ public static String serializePartitionKeys(HiveTablePartition partData) throws Exception { @@ -432,7 +439,7 @@ public class HiveUtilities { @SuppressWarnings("unchecked") public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception { - String userData = null; + HiveUserData hiveUserData = null; if (fragmenterClassName == null) { throw new IllegalArgumentException("No fragmenter provided."); @@ -440,24 +447,36 @@ public class HiveUtilities { Class fragmenterClass = Class.forName(fragmenterClassName); + String inputFormatName = partData.storageDesc.getInputFormat(); + String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib(); + String propertiesString = serializeProperties(partData.properties); + String partitionKeys = serializePartitionKeys(partData); + if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) { - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String partitionKeys = serializePartitionKeys(partData); assertFileType(inputFormatName, partData); - userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM - + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; - } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){ - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String propertiesString = serializeProperties(partData.properties); - String partitionKeys = serializePartitionKeys(partData); - userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName - + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM - + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; - } else { - throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName); } - return userData.getBytes(); + + hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter); + + return hiveUserData.toString().getBytes(); + } + + public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{ + String userData = new String(input.getFragmentUserData()); + String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM); + + if (toks.length != (EXPECTED_NUM_OF_TOKS)) { + throw new UserDataException("HiveInputFormatFragmenter expected " + + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length); + } + + HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4])); + + if (supportedSerdes.length > 0) { + /* Make sure this serde is supported */ + PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes); + } + + return hiveUserData; } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java index 7bbe811..1d90d01 100644 --- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java +++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java @@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,7 +45,7 @@ import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class}) +@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class}) @SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf", "org.apache.hadoop.hive.metastore.api.MetaException", "org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits @@ -61,8 +63,9 @@ public class HiveORCAccessorTest { jobConf = new JobConf(); PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf); - PowerMockito.mockStatic(HiveInputFormatFragmenter.class); - PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"}); + PowerMockito.mockStatic(HiveUtilities.class); + PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true)); + PowerMockito.mockStatic(HdfsUtilities.class); PowerMockito.mockStatic(HiveDataFragmenter.class); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java index fc5ed0f..d053760 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java @@ -19,26 +19,30 @@ package org.apache.hawq.pxf.service; * under the License. */ +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.TextInputFormat; public class ProfileFactory { private static final String HIVE_TEXT_PROFILE = "HiveText"; private static final String HIVE_RC_PROFILE = "HiveRC"; private static final String HIVE_ORC_PROFILE = "HiveORC"; + private static final String HIVE_PROFILE = "Hive"; - public static String get(InputFormat inputFormat) throws Exception { + public static String get(InputFormat inputFormat) { String profileName = null; - // TODO: Uncomment in process of HAWQ-1228 implementation - //if (inputFormat instanceof TextInputFormat) { - // profileName = HIVE_TEXT_PROFILE; - //} else if (inputFormat instanceof RCFileInputFormat) { - // profileName = HIVE_RC_PROFILE; - /*} else */if (inputFormat instanceof OrcInputFormat) { + if (inputFormat instanceof TextInputFormat) { + profileName = HIVE_TEXT_PROFILE; + } else if (inputFormat instanceof RCFileInputFormat) { + profileName = HIVE_RC_PROFILE; + } else if (inputFormat instanceof OrcInputFormat) { profileName = HIVE_ORC_PROFILE; + } else { + //Default case + profileName = HIVE_PROFILE; } - return profileName; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml index 1edb6d5..a3f21f6 100644 --- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -49,6 +49,7 @@ under the License. <accessor>org.apache.hawq.pxf.plugins.hive.HiveAccessor</accessor> <resolver>org.apache.hawq.pxf.plugins.hive.HiveResolver</resolver> <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + <outputFormat>BINARY</outputFormat> </plugins> </profile> <profile> @@ -63,6 +64,7 @@ under the License. <accessor>org.apache.hawq.pxf.plugins.hive.HiveRCFileAccessor</accessor> <resolver>org.apache.hawq.pxf.plugins.hive.HiveColumnarSerdeResolver</resolver> <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + <outputFormat>TEXT</outputFormat> </plugins> </profile> <profile> @@ -76,6 +78,7 @@ under the License. <accessor>org.apache.hawq.pxf.plugins.hive.HiveLineBreakAccessor</accessor> <resolver>org.apache.hawq.pxf.plugins.hive.HiveStringPassResolver</resolver> <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + <outputFormat>TEXT</outputFormat> </plugins> </profile> <profile> @@ -89,6 +92,7 @@ under the License. <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor> <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver> <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata> + <outputFormat>BINARY</outputFormat> </plugins> </profile> <profile> http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/access/external/fileam.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c index 70a115a..20f662d 100644 --- a/src/backend/access/external/fileam.c +++ b/src/backend/access/external/fileam.c @@ -461,8 +461,12 @@ external_stopscan(FileScanDesc scan) ExternalSelectDesc external_getnext_init(PlanState *state) { ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData)); + if (state != NULL) + { desc->projInfo = state->ps_ProjInfo; + desc->fmttype = &((ExternalScan *) state->plan)->fmtType; + } return desc; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/catalog/external/externalmd.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c index 0e39d25..ccdbdd6 100644 --- a/src/backend/catalog/external/externalmd.c +++ b/src/backend/catalog/external/externalmd.c @@ -57,6 +57,8 @@ static void LoadDistributionPolicy(Oid relid, PxfItem *pxfItem); static void LoadExtTable(Oid relid, PxfItem *pxfItem); static void LoadColumns(Oid relid, List *columns); static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod); +static Datum GetFormatTypeForProfile(const List *outputFormats); +static Datum GetFormatOptionsForProfile(const List *outputFormats); const int maxNumTypeModifiers = 2; /* @@ -128,7 +130,22 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile) pxfItem->profile = profile; pxfItem->path = pstrdup(json_object_get_string(itemPath)); pxfItem->name = pstrdup(json_object_get_string(itemName)); - + + /* parse output formats */ + struct json_object *jsonOutputFormats = json_object_object_get(pxfMD, "outputFormats"); + + if (NULL != jsonOutputFormats) + { + const int numOutputFormats = json_object_array_length(jsonOutputFormats); + for (int i = 0; i < numOutputFormats; i++) + { + PxfField *pxfField = palloc0(sizeof(PxfField)); + struct json_object *jsonOutputFormat = json_object_array_get_idx(jsonOutputFormats, i); + char *outupFormat = pstrdup(json_object_get_string(jsonOutputFormat)); + pxfItem->outputFormats = lappend(pxfItem->outputFormats, outupFormat); + } + } + elog(DEBUG1, "Parsed item %s, namespace %s", itemName, itemPath); /* parse columns */ @@ -464,17 +481,10 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem) Assert(NULL != astate); Datum location = makeArrayResult(astate, CurrentMemoryContext); - /* format options - should be "formatter 'pxfwritable_import'" */ - StringInfoData formatStr; - initStringInfo(&formatStr); - appendStringInfo(&formatStr, "formatter 'pxfwritable_import'"); - Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data)); - pfree(formatStr.data); - values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid); values[Anum_pg_exttable_location - 1] = location; - values[Anum_pg_exttable_fmttype - 1] = CharGetDatum('b' /* binary */); - values[Anum_pg_exttable_fmtopts - 1] = format_opts; + values[Anum_pg_exttable_fmttype - 1] = GetFormatTypeForProfile(pxfItem->outputFormats); + values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats); nulls[Anum_pg_exttable_command - 1] = true; nulls[Anum_pg_exttable_rejectlimit - 1] = true; nulls[Anum_pg_exttable_rejectlimittype - 1] = true; @@ -631,3 +641,30 @@ static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nT return VARHDRSZ + result; } +static Datum GetFormatTypeForProfile(const List *outputFormats) +{ + + if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0) + { + return CharGetDatum(TextFormatType); + } else + { + return CharGetDatum(CustomFormatType); + } +} + +static Datum GetFormatOptionsForProfile(const List *outputFormats) +{ + StringInfoData formatStr; + initStringInfo(&formatStr); + if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0) + { + appendStringInfo(&formatStr, "delimiter '\x01' null '\N' escape '\'"); + } else { + appendStringInfo(&formatStr, "formatter 'pxfwritable_import'"); + } + Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data)); + pfree(formatStr.data); + return format_opts; +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index b524df8..c5c217c 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -49,7 +49,7 @@ gphadoop_context* create_context(PG_FUNCTION_ARGS); void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS); void append_churl_header_if_exists(gphadoop_context* context, const char* key, const char* value); -void set_current_fragment_headers(gphadoop_context* context); +void set_current_fragment_headers(gphadoop_context* context, char *fmttype); void gpbridge_import_start(PG_FUNCTION_ARGS); void gpbridge_export_start(PG_FUNCTION_ARGS); PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel); @@ -62,6 +62,7 @@ void build_uri_for_write(gphadoop_context* context, PxfServer* rest_server); size_t fill_buffer(gphadoop_context* context, char* start, size_t size); void add_delegation_token(PxfInputData *inputData); void free_token_resources(PxfInputData *inputData); +static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat); /* Custom protocol entry point for read */ @@ -207,7 +208,7 @@ void append_churl_header_if_exists(gphadoop_context* context, const char* key, c * 2. X-GP-FRAGMENT-USER-DATA header is changed to the current fragment's user data. * If the fragment doesn't have user data, the header will be removed. */ -void set_current_fragment_headers(gphadoop_context* context) +void set_current_fragment_headers(gphadoop_context* context, char *fmttype) { FragmentData* frag_data = (FragmentData*)lfirst(context->current_fragment); elog(DEBUG2, "pxf: set_current_fragment_source_name: source_name %s, index %s, has user data: %s ", @@ -229,11 +230,21 @@ void set_current_fragment_headers(gphadoop_context* context) /* if current fragment has optimal profile set it*/ if (frag_data->profile) { - churl_headers_override(context->churl_headers, "X-GP-PROFILE", frag_data->profile); + char *supportedProfile = NULL; + char *supportedFormat = NULL; + assign_optimal_supported_profile(frag_data->profile, fmttype, &supportedProfile, &supportedFormat); + + churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile); + churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat); } else if (context->gphd_uri->profile) { /* if current fragment doesn't have any optimal profile, set to use profile from url */ - churl_headers_override(context->churl_headers, "X-GP-PROFILE", context->gphd_uri->profile); + char *supportedProfile = NULL; + char *supportedFormat = NULL; + assign_optimal_supported_profile(context->gphd_uri->profile, fmttype, &supportedProfile, &supportedFormat); + + churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile); + churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat); } /* if there is no profile passed in url, we expect to have accessor+fragmenter+resolver so no action needed by this point */ @@ -249,7 +260,8 @@ void gpbridge_import_start(PG_FUNCTION_ARGS) context->churl_headers = churl_headers_init(); add_querydata_to_http_header(context, fcinfo); - set_current_fragment_headers(context); + char *fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo); + set_current_fragment_headers(context, fmttype); context->churl_handle = churl_init_download(context->uri.data, context->churl_headers); @@ -399,6 +411,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel) size_t gpbridge_read(PG_FUNCTION_ARGS) { char* databuf; + char* fmttype; size_t datalen; size_t n = 0; gphadoop_context* context; @@ -406,6 +419,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS) context = EXTPROTOCOL_GET_USER_CTX(fcinfo); databuf = EXTPROTOCOL_GET_DATABUF(fcinfo); datalen = EXTPROTOCOL_GET_DATALEN(fcinfo); + fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo); while ((n = fill_buffer(context, databuf, datalen)) == 0) { @@ -419,7 +433,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS) if (context->current_fragment == NULL) return 0; - set_current_fragment_headers(context); + set_current_fragment_headers(context, fmttype); churl_download_restart(context->churl_handle, context->uri.data, context->churl_headers); /* read some bytes to make sure the connection is established */ @@ -547,3 +561,20 @@ void free_token_resources(PxfInputData *inputData) pfree(inputData->token); } + +static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat) +{ + if (fmttype_is_text(*fmttype) && ((strcmp(profile, "HiveText") == 0) || (strcmp(profile, "HiveRc") == 0))) + { + *supportedFormat = "TEXT"; + *supportedProfile = profile; + } else if (fmttype_is_custom(*fmttype)) + { + *supportedFormat = "GPDBWritable"; + *supportedProfile = profile; + } else + { + *supportedFormat = "GPDBWritable"; + *supportedProfile = "Hive"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/extprotocol.h ---------------------------------------------------------------------- diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h index 4b69bb7..4c37ac6 100644 --- a/src/include/access/extprotocol.h +++ b/src/include/access/extprotocol.h @@ -66,6 +66,7 @@ typedef ExtProtocolData *ExtProtocol; #define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx) #define EXTPROTOCOL_GET_SELECTDESC(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc) #define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo) +#define EXTPROTOCOL_GET_FMTTYPE(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->fmttype) #define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call) #define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/fileam.h ---------------------------------------------------------------------- diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h index 1e926d5..df8c284 100644 --- a/src/include/access/fileam.h +++ b/src/include/access/fileam.h @@ -70,6 +70,7 @@ typedef ExternalInsertDescData *ExternalInsertDesc; typedef struct ExternalSelectDescData { ProjectionInfo *projInfo; + char *fmttype; } ExternalSelectDescData; typedef enum DataLineStatus http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/external/itemmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h index e6dad63..5717a53 100644 --- a/src/include/catalog/external/itemmd.h +++ b/src/include/catalog/external/itemmd.h @@ -67,6 +67,9 @@ typedef struct PxfItem /* fields */ List *fields; + + /* output formats*/ + List *outputFormats; } PxfItem; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/pg_exttable.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h index 3a0fadd..ae2fb00 100644 --- a/src/include/catalog/pg_exttable.h +++ b/src/include/catalog/pg_exttable.h @@ -164,8 +164,12 @@ GetExtTableEntry(Oid relid); extern void RemoveExtTableEntry(Oid relid); -#define fmttype_is_custom(c) (c == 'b') -#define fmttype_is_text(c) (c == 't') -#define fmttype_is_csv(c) (c == 'c') +#define CustomFormatType 'b' +#define TextFormatType 't' +#define CsvFormatType 'c' + +#define fmttype_is_custom(c) (c == CustomFormatType) +#define fmttype_is_text(c) (c == TextFormatType) +#define fmttype_is_csv(c) (c == CsvFormatType) #endif /* PG_EXTTABLE_H */
