Repository: incubator-hawq Updated Branches: refs/heads/master 6a6fa2755 -> fbef55d49
HAWQ-1178. Enhance Fragmenter api to return profile name. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/9cd42ada Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/9cd42ada Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/9cd42ada Branch: refs/heads/master Commit: 9cd42ada65f20b6181c26e03215e9fe81de88d49 Parents: 6a6fa27 Author: Oleksandr Diachenko <[email protected]> Authored: Wed Dec 21 22:33:06 2016 -0800 Committer: Oleksandr Diachenko <[email protected]> Committed: Wed Dec 21 22:33:06 2016 -0800 ---------------------------------------------------------------------- pxf/build.gradle | 6 + .../java/org/apache/hawq/pxf/api/Fragment.java | 22 ++++ .../hawq/pxf/api/utilities/InputData.java | 11 ++ .../pxf/plugins/hive/HiveDataFragmenter.java | 101 +++------------ .../plugins/hive/HiveInputFormatFragmenter.java | 71 ---------- .../pxf/plugins/hive/HiveTablePartition.java | 56 ++++++++ .../plugins/hive/utilities/HiveUtilities.java | 129 +++++++++++++++++++ .../apache/hawq/pxf/service/ProfileFactory.java | 45 +++++++ .../pxf/service/utilities/ProtocolData.java | 1 - 9 files changed, 289 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/build.gradle ---------------------------------------------------------------------- diff --git a/pxf/build.gradle b/pxf/build.gradle index aed48b1..385bf08 100644 --- a/pxf/build.gradle +++ b/pxf/build.gradle @@ -193,7 +193,12 @@ project('pxf-service') { providedCompile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion" providedCompile "org.apache.hadoop:hadoop-auth:$hadoopVersion" providedCompile "org.apache.hadoop:hadoop-annotations:$hadoopVersion" + providedCompile "org.apache.hadoop:hadoop-mapreduce-client-core:$hadoopVersion" providedCompile "org.apache.tomcat:tomcat-catalina:$tomcatVersion" + providedCompile("org.apache.hive:hive-exec:$hiveVersion") { + exclude module: 'calcite-core' + exclude module: 'calcite-avatica' + } } ospackage { @@ -342,6 +347,7 @@ project('pxf-hdfs') { project('pxf-hive') { dependencies { compile(project(':pxf-hdfs')) + compile(project(':pxf-service')) compile("org.apache.hive:hive-exec:$hiveVersion") { exclude module: 'calcite-core' exclude module: 'calcite-avatica' http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragment.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragment.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragment.java index 130a91d..ff6f131 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragment.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragment.java @@ -51,6 +51,11 @@ public class Fragment { private byte[] userData; /** + * Profile name, recommended for reading given Fragment. + */ + private String profile; + + /** * Constructs a Fragment. * * @param sourceName the resource uri (File path+name, table name, etc.) @@ -83,6 +88,15 @@ public class Fragment { this.userData = userData; } + public Fragment(String sourceName, + String[] hosts, + byte[] metadata, + byte[] userData, + String profile) { + this(sourceName, hosts, metadata, userData); + this.profile = profile; + } + public String getSourceName() { return sourceName; } @@ -118,4 +132,12 @@ public class Fragment { public void setUserData(byte[] userData) { this.userData = userData; } + + public String getProfile() { + return profile; + } + + public void setProfile(String profile) { + this.profile = profile; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java index 891dba8..5afedca 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java @@ -43,6 +43,7 @@ public class InputData { protected boolean filterStringValid; protected String filterString; protected String dataSource; + protected String profile; protected String accessor; protected String resolver; protected String fragmenter; @@ -94,6 +95,7 @@ public class InputData { this.filterStringValid = copy.filterStringValid; this.filterString = copy.filterString; this.dataSource = copy.dataSource; + this.profile = copy.profile; this.accessor = copy.accessor; this.resolver = copy.resolver; this.fragmenter = copy.fragmenter; @@ -246,6 +248,15 @@ public class InputData { } /** + * Returns the profile name. + * + * @return name of profile + */ + public String getProfile() { + return profile; + } + + /** * Returns the ClassName for the java class that was defined as Accessor. * * @return class name for Accessor http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java index 97a297e..2d2b53e 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java @@ -56,8 +56,10 @@ import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.ProfilesConf; import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; +import org.apache.hawq.pxf.service.ProfileFactory; /** * Fragmenter class for HIVE tables. <br> @@ -76,10 +78,10 @@ public class HiveDataFragmenter extends Fragmenter { private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class); private static final short ALL_PARTS = -1; - static final String HIVE_UD_DELIM = "!HUDD!"; - static final String HIVE_1_PART_DELIM = "!H1PD!"; - static final String HIVE_PARTITIONS_DELIM = "!HPAD!"; - static final String HIVE_NO_PART_TBL = "!HNPT!"; + public static final String HIVE_UD_DELIM = "!HUDD!"; + public static final String HIVE_1_PART_DELIM = "!H1PD!"; + public static final String HIVE_PARTITIONS_DELIM = "!HPAD!"; + public static final String HIVE_NO_PART_TBL = "!HNPT!"; static final String HIVE_API_EQ = " = "; static final String HIVE_API_LT = " < "; @@ -101,36 +103,6 @@ public class HiveDataFragmenter extends Fragmenter { private Map<String, String> partitionkeyTypes = new HashMap<>(); /** - * A Hive table unit - means a subset of the HIVE table, where we can say - * that for all files in this subset, they all have the same InputFormat and - * Serde. For a partitioned table the HiveTableUnit will be one partition - * and for an unpartitioned table, the HiveTableUnit will be the whole table - */ - class HiveTablePartition { - StorageDescriptor storageDesc; - Properties properties; - Partition partition; - List<FieldSchema> partitionKeys; - String tableName; - - HiveTablePartition(StorageDescriptor storageDesc, - Properties properties, Partition partition, - List<FieldSchema> partitionKeys, String tableName) { - this.storageDesc = storageDesc; - this.properties = properties; - this.partition = partition; - this.partitionKeys = partitionKeys; - this.tableName = tableName; - } - - @Override - public String toString() { - return "table - " + tableName - + ((partition == null) ? "" : ", partition - " + partition); - } - } - - /** * Constructs a HiveDataFragmenter object. * * @param inputData all input parameters coming from the client @@ -309,6 +281,19 @@ public class HiveDataFragmenter extends Fragmenter { throws Exception { InputFormat<?, ?> fformat = makeInputFormat( tablePartition.storageDesc.getInputFormat(), jobConf); + String profile = null; + if (inputData.getProfile() != null) { + // evaluate optimal profile based on file format if profile was explicitly specified in url + // if user passed accessor+fragmenter+resolver - use them + profile = ProfileFactory.get(fformat); + } + String fragmenterForProfile = null; + if (profile != null) { + fragmenterForProfile = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-FRAGMENTER"); + } else { + fragmenterForProfile = inputData.getFragmenter(); + } + FileInputFormat.setInputPaths(jobConf, new Path( tablePartition.storageDesc.getLocation())); @@ -327,57 +312,11 @@ public class HiveDataFragmenter extends Fragmenter { byte[] locationInfo = HdfsUtilities.prepareFragmentMetadata(fsp); Fragment fragment = new Fragment(filepath, hosts, locationInfo, - makeUserData(tablePartition)); + HiveUtilities.makeUserData(fragmenterForProfile, tablePartition, filterInFragmenter), profile); fragments.add(fragment); } } - /* Turns a Properties class into a string */ - private String serializeProperties(Properties props) throws Exception { - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - props.store(outStream, ""/* comments */); - return outStream.toString(); - } - - /* Turns the partition keys into a string */ - String serializePartitionKeys(HiveTablePartition partData) throws Exception { - if (partData.partition == null) /* - * this is a simple hive table - there - * are no partitions - */{ - return HIVE_NO_PART_TBL; - } - - StringBuilder partitionKeys = new StringBuilder(); - String prefix = ""; - ListIterator<String> valsIter = partData.partition.getValues().listIterator(); - ListIterator<FieldSchema> keysIter = partData.partitionKeys.listIterator(); - while (valsIter.hasNext() && keysIter.hasNext()) { - FieldSchema key = keysIter.next(); - String name = key.getName(); - String type = key.getType(); - String val = valsIter.next(); - String oneLevel = prefix + name + HIVE_1_PART_DELIM + type - + HIVE_1_PART_DELIM + val; - partitionKeys.append(oneLevel); - prefix = HIVE_PARTITIONS_DELIM; - } - - return partitionKeys.toString(); - } - - byte[] makeUserData(HiveTablePartition partData) throws Exception { - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String propertiesString = serializeProperties(partData.properties); - String partitionKeys = serializePartitionKeys(partData); - String userData = inputFormatName + HIVE_UD_DELIM + serdeName - + HIVE_UD_DELIM + propertiesString + HIVE_UD_DELIM - + partitionKeys + HIVE_UD_DELIM + filterInFragmenter; - - return userData.getBytes(); - } - /* * Build filter string for HiveMetaStoreClient.listPartitionsByFilter API * method. http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java index b6a6041..051a246 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java @@ -55,14 +55,6 @@ import java.util.List; */ public class HiveInputFormatFragmenter extends HiveDataFragmenter { private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class); - - static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; - static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; - static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; - static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; - static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; - static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; private static final int EXPECTED_NUM_OF_TOKS = 3; public static final int TOK_SERDE = 0; public static final int TOK_KEYS = 1; @@ -164,69 +156,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter { } - /* - * Validates that partition format corresponds to PXF supported formats and - * transforms the class name to an enumeration for writing it to the - * accessors on other PXF instances. - */ - private String assertFileType(String className, HiveTablePartition partData) - throws Exception { - switch (className) { - case STR_RC_FILE_INPUT_FORMAT: - return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name(); - case STR_TEXT_FILE_INPUT_FORMAT: - return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name(); - case STR_ORC_FILE_INPUT_FORMAT: - return PXF_HIVE_INPUT_FORMATS.ORC_FILE_INPUT_FORMAT.name(); - default: - throw new IllegalArgumentException( - "HiveInputFormatFragmenter does not yet support " - + className - + " for " - + partData - + ". Supported InputFormat are " - + Arrays.toString(PXF_HIVE_INPUT_FORMATS.values())); - } - } - - /* - * Validates that partition serde corresponds to PXF supported serdes and - * transforms the class name to an enumeration for writing it to the - * resolvers on other PXF instances. - */ - private String assertSerde(String className, HiveTablePartition partData) - throws Exception { - switch (className) { - case STR_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.COLUMNAR_SERDE.name(); - case STR_LAZY_BINARY_COLUMNAR_SERDE: - return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(); - case STR_LAZY_SIMPLE_SERDE: - return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name(); - case STR_ORC_SERDE: - return PXF_HIVE_SERDES.ORC_SERDE.name(); - default: - throw new UnsupportedTypeException( - "HiveInputFormatFragmenter does not yet support " - + className + " for " + partData - + ". Supported serializers are: " - + Arrays.toString(PXF_HIVE_SERDES.values())); - } - } - - @Override - byte[] makeUserData(HiveTablePartition partData) throws Exception { - String inputFormatName = partData.storageDesc.getInputFormat(); - String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); - String partitionKeys = serializePartitionKeys(partData); - - assertFileType(inputFormatName, partData); - String userData = assertSerde(serdeName, partData) + HIVE_UD_DELIM - + partitionKeys + HIVE_UD_DELIM + filterInFragmenter; - - return userData.getBytes(); - } - /** * Returns statistics for Hive table. Currently it's not implemented. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveTablePartition.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveTablePartition.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveTablePartition.java new file mode 100644 index 0000000..3905fef --- /dev/null +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveTablePartition.java @@ -0,0 +1,56 @@ +package org.apache.hawq.pxf.plugins.hive; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Properties; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.FieldSchema; + +/** + * A Hive table unit - means a subset of the HIVE table, where we can say + * that for all files in this subset, they all have the same InputFormat and + * Serde. For a partitioned table the HiveTableUnit will be one partition + * and for an unpartitioned table, the HiveTableUnit will be the whole table + */ +public class HiveTablePartition { + public StorageDescriptor storageDesc; + public Properties properties; + public Partition partition; + public List<FieldSchema> partitionKeys; + public String tableName; + + HiveTablePartition(StorageDescriptor storageDesc, + Properties properties, Partition partition, + List<FieldSchema> partitionKeys, String tableName) { + this.storageDesc = storageDesc; + this.properties = properties; + this.partition = partition; + this.partitionKeys = partitionKeys; + this.tableName = tableName; + } + + @Override + public String toString() { + return "table - " + tableName + + ((partition == null) ? "" : ", partition - " + partition); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index e18fcd8..575b129 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -20,9 +20,12 @@ package org.apache.hawq.pxf.plugins.hive.utilities; */ +import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.List; import java.util.ArrayList; +import java.util.ListIterator; +import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -33,10 +36,16 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hawq.pxf.api.Fragmenter; import org.apache.hawq.pxf.api.Metadata; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.utilities.EnumHawqType; import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter; +import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter; +import org.apache.hawq.pxf.plugins.hive.HiveTablePartition; +import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS; +import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES; /** * Class containing helper functions connecting @@ -52,6 +61,14 @@ public class HiveUtilities { */ private static final String HIVE_DEFAULT_DBNAME = "default"; + static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; + static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; + static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; + static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; + static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + /** * Initializes the HiveMetaStoreClient * Uses classpath configuration files to locate the MetaStore @@ -326,4 +343,116 @@ public class HiveUtilities { break; } } + + /* Turns a Properties class into a string */ + private static String serializeProperties(Properties props) throws Exception { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + props.store(outStream, ""/* comments */); + return outStream.toString(); + } + + /* + * Validates that partition format corresponds to PXF supported formats and + * transforms the class name to an enumeration for writing it to the + * accessors on other PXF instances. + */ + private static String assertFileType(String className, HiveTablePartition partData) + throws Exception { + switch (className) { + case STR_RC_FILE_INPUT_FORMAT: + return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name(); + case STR_TEXT_FILE_INPUT_FORMAT: + return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name(); + case STR_ORC_FILE_INPUT_FORMAT: + return PXF_HIVE_INPUT_FORMATS.ORC_FILE_INPUT_FORMAT.name(); + default: + throw new IllegalArgumentException( + "HiveInputFormatFragmenter does not yet support " + + className + + " for " + + partData + + ". Supported InputFormat are " + + Arrays.toString(PXF_HIVE_INPUT_FORMATS.values())); + } + } + + /* + * Validates that partition serde corresponds to PXF supported serdes and + * transforms the class name to an enumeration for writing it to the + * resolvers on other PXF instances. + */ + private static String assertSerde(String className, HiveTablePartition partData) + throws Exception { + switch (className) { + case STR_COLUMNAR_SERDE: + return PXF_HIVE_SERDES.COLUMNAR_SERDE.name(); + case STR_LAZY_BINARY_COLUMNAR_SERDE: + return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(); + case STR_LAZY_SIMPLE_SERDE: + return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name(); + case STR_ORC_SERDE: + return PXF_HIVE_SERDES.ORC_SERDE.name(); + default: + throw new UnsupportedTypeException( + "HiveInputFormatFragmenter does not yet support " + + className + " for " + partData + + ". Supported serializers are: " + + Arrays.toString(PXF_HIVE_SERDES.values())); + } + } + + + /* Turns the partition keys into a string */ + public static String serializePartitionKeys(HiveTablePartition partData) throws Exception { + if (partData.partition == null) /* + * this is a simple hive table - there + * are no partitions + */{ + return HiveDataFragmenter.HIVE_NO_PART_TBL; + } + + StringBuilder partitionKeys = new StringBuilder(); + String prefix = ""; + ListIterator<String> valsIter = partData.partition.getValues().listIterator(); + ListIterator<FieldSchema> keysIter = partData.partitionKeys.listIterator(); + while (valsIter.hasNext() && keysIter.hasNext()) { + FieldSchema key = keysIter.next(); + String name = key.getName(); + String type = key.getType(); + String val = valsIter.next(); + String oneLevel = prefix + name + HiveDataFragmenter.HIVE_1_PART_DELIM + type + + HiveDataFragmenter.HIVE_1_PART_DELIM + val; + partitionKeys.append(oneLevel); + prefix = HiveDataFragmenter.HIVE_PARTITIONS_DELIM; + } + + return partitionKeys.toString(); + } + + public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception { + + String userData = null; + + if (fragmenterClassName == null) { + throw new IllegalArgumentException("No fragmenter provided."); + } + + if (fragmenterClassName.equals("org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter")) { + String inputFormatName = partData.storageDesc.getInputFormat(); + String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); + String partitionKeys = serializePartitionKeys(partData); + assertFileType(inputFormatName, partData); + userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM + + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; + } else { + String inputFormatName = partData.storageDesc.getInputFormat(); + String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib(); + String propertiesString = serializeProperties(partData.properties); + String partitionKeys = serializePartitionKeys(partData); + userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName + + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM + + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter; + } + return userData.getBytes(); + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java new file mode 100644 index 0000000..fc5ed0f --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java @@ -0,0 +1,45 @@ +package org.apache.hawq.pxf.service; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.mapred.InputFormat; + +public class ProfileFactory { + + private static final String HIVE_TEXT_PROFILE = "HiveText"; + private static final String HIVE_RC_PROFILE = "HiveRC"; + private static final String HIVE_ORC_PROFILE = "HiveORC"; + + public static String get(InputFormat inputFormat) throws Exception { + String profileName = null; + // TODO: Uncomment in process of HAWQ-1228 implementation + //if (inputFormat instanceof TextInputFormat) { + // profileName = HIVE_TEXT_PROFILE; + //} else if (inputFormat instanceof RCFileInputFormat) { + // profileName = HIVE_RC_PROFILE; + /*} else */if (inputFormat instanceof OrcInputFormat) { + profileName = HIVE_ORC_PROFILE; + } + + return profileName; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9cd42ada/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index a250b18..a0e63ce 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -50,7 +50,6 @@ public class ProtocolData extends InputData { protected OutputFormat outputFormat; protected int port; protected String host; - protected String profile; protected String token; // statistics parameters protected int statsMaxFragments;
