Added: pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java?rev=1671956&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java (added) +++ pig/trunk/src/org/apache/pig/impl/util/hive/HiveUtils.java Tue Apr 7 21:24:48 2015 @@ -0,0 +1,780 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util.hive; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.pig.PigWarning; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.joda.time.DateTime; + +public class HiveUtils { + + static TupleFactory tf = TupleFactory.getInstance(); + + public static Object convertHiveToPig(Object obj, ObjectInspector oi, boolean[] includedColumns) { + Object result = null; + if (obj == null) { + return result; + } + switch (oi.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi; + result = getPrimaryFromHive(obj, poi); + break; + case STRUCT: + StructObjectInspector soi = (StructObjectInspector)oi; + List<StructField> elementFields = (List<StructField>) soi.getAllStructFieldRefs(); + List<Object> items = soi.getStructFieldsDataAsList(obj); + Tuple t = tf.newTuple(); + for (int i=0;i<items.size();i++) { + if (includedColumns==null || includedColumns[i]) { + Object convertedItem = convertHiveToPig(items.get(i), elementFields.get(i).getFieldObjectInspector(), null); + t.append(convertedItem); + } + } + result = t; + break; + case MAP: + MapObjectInspector moi = (MapObjectInspector)oi; + ObjectInspector keyObjectInspector = moi.getMapKeyObjectInspector(); + ObjectInspector valueObjectInspector = moi.getMapValueObjectInspector(); + Map<Object, Object> m = (Map<Object, Object>)obj; + result = new HashMap(); + for (Map.Entry<Object, Object> entry : m.entrySet()) { + Object convertedKey = convertHiveToPig(entry.getKey(), keyObjectInspector, null); + Object convertedValue = convertHiveToPig(entry.getValue(), valueObjectInspector, null); + if (convertedKey!=null) { + ((Map)result).put(convertedKey.toString(), convertedValue); + } else { + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.incrCounter(PigWarning.UDF_WARNING_1, 1); + } + } + } + break; + case LIST: + ListObjectInspector loi = (ListObjectInspector)oi; + result = BagFactory.getInstance().newDefaultBag(); + ObjectInspector itemObjectInspector = loi.getListElementObjectInspector(); + for (Object item : loi.getList(obj)) { + Object convertedItem = convertHiveToPig(item, itemObjectInspector, null); + Tuple innerTuple; + // Hive array contains a single item of any type, if it is not tuple, + // need to wrap it in tuple + if (convertedItem instanceof Tuple) { + innerTuple = (Tuple)convertedItem; + } else { + innerTuple = tf.newTuple(1); + try { + innerTuple.set(0, convertedItem); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + ((DataBag)result).add(innerTuple); + } + break; + default: + throw new IllegalArgumentException("Unknown type " + + oi.getCategory()); + } + return result; + } + + public static Object getPrimaryFromHive(Object obj, PrimitiveObjectInspector poi) { + Object result = null; + if (obj == null) { + return result; + } + switch (poi.getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + case BOOLEAN: + case INT: + case LONG: + case STRING: + result = poi.getPrimitiveJavaObject(obj); + break; + case CHAR: + result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; + case VARCHAR: + result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; + case BYTE: + result = (int)(Byte)poi.getPrimitiveJavaObject(obj); + break; + case SHORT: + result = (int)(Short)poi.getPrimitiveJavaObject(obj); + break; + case BINARY: + byte[] b = (byte[])poi.getPrimitiveJavaObject(obj); + // Make a copy + result = new DataByteArray(b, 0, b.length); + break; + case TIMESTAMP: + java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj); + result = new DateTime(origTimeStamp.getTime()); + break; + case DATE: + java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj); + result = new DateTime(origDate.getTime()); + break; + case DECIMAL: + org.apache.hadoop.hive.common.type.HiveDecimal origDecimal = + (org.apache.hadoop.hive.common.type.HiveDecimal)poi.getPrimitiveJavaObject(obj); + result = origDecimal.bigDecimalValue(); + break; + default: + throw new IllegalArgumentException("Unknown primitive type " + + (poi).getPrimitiveCategory()); + } + return result; + } + + public static ResourceFieldSchema getResourceFieldSchema(TypeInfo ti) throws IOException { + ResourceFieldSchema fieldSchema = new ResourceFieldSchema(); + ResourceFieldSchema[] innerFs; + ResourceSchema innerSchema; + switch (ti.getCategory()) { + case STRUCT: + StructTypeInfo sti = (StructTypeInfo)ti; + fieldSchema.setType(DataType.TUPLE); + List<TypeInfo> typeInfos = sti.getAllStructFieldTypeInfos(); + List<String> names = sti.getAllStructFieldNames(); + innerFs = new ResourceFieldSchema[typeInfos.size()]; + for (int i=0;i<typeInfos.size();i++) { + innerFs[i] = getResourceFieldSchema(typeInfos.get(i)); + innerFs[i].setName(names.get(i)); + } + innerSchema = new ResourceSchema(); + innerSchema.setFields(innerFs); + fieldSchema.setSchema(innerSchema); + break; + case LIST: + ListTypeInfo lti = (ListTypeInfo)ti; + fieldSchema.setType(DataType.BAG); + innerFs = new ResourceFieldSchema[1]; + ResourceFieldSchema itemSchema = getResourceFieldSchema(lti.getListElementTypeInfo()); + if (itemSchema.getType() == DataType.TUPLE) { + innerFs[0] = itemSchema; + } else { + // If item is not tuple, wrap it into tuple + ResourceFieldSchema tupleFieldSchema = new ResourceFieldSchema(); + tupleFieldSchema.setType(DataType.TUPLE); + ResourceSchema tupleSchema = new ResourceSchema(); + tupleSchema.setFields(new ResourceFieldSchema[] {itemSchema}); + innerFs[0] = tupleFieldSchema; + } + + innerSchema = new ResourceSchema(); + innerSchema.setFields(innerFs); + fieldSchema.setSchema(innerSchema); + break; + case MAP: + MapTypeInfo mti = (MapTypeInfo)ti; + fieldSchema.setType(DataType.MAP); + innerFs = new ResourceFieldSchema[1]; + innerFs[0] = getResourceFieldSchema(mti.getMapValueTypeInfo()); + innerSchema = new ResourceSchema(); + innerSchema.setFields(innerFs); + fieldSchema.setSchema(innerSchema); + break; + case PRIMITIVE: + switch (((PrimitiveTypeInfo)ti).getPrimitiveCategory()) { + case FLOAT: + fieldSchema.setType(DataType.FLOAT); + break; + case DOUBLE: + fieldSchema.setType(DataType.DOUBLE); + break; + case BOOLEAN: + fieldSchema.setType(DataType.BOOLEAN); + break; + case BYTE: + fieldSchema.setType(DataType.INTEGER); + break; + case SHORT: + fieldSchema.setType(DataType.INTEGER); + break; + case INT: + fieldSchema.setType(DataType.INTEGER); + break; + case LONG: + fieldSchema.setType(DataType.LONG); + break; + case BINARY: + fieldSchema.setType(DataType.BYTEARRAY); + break; + case STRING: + fieldSchema.setType(DataType.CHARARRAY); + break; + case VARCHAR: + fieldSchema.setType(DataType.CHARARRAY); + break; + case CHAR: + fieldSchema.setType(DataType.CHARARRAY); + break; + case TIMESTAMP: + fieldSchema.setType(DataType.DATETIME); + break; + case DATE: + fieldSchema.setType(DataType.DATETIME); + break; + case DECIMAL: + fieldSchema.setType(DataType.BIGDECIMAL); + break; + default: + throw new IllegalArgumentException("Unknown primitive type " + + ((PrimitiveTypeInfo)ti).getPrimitiveCategory()); + } + break; + } + + return fieldSchema; + } + + public static TypeInfo getTypeInfo(ResourceFieldSchema fs) throws IOException { + TypeInfo ti; + switch (fs.getType()) { + case DataType.TUPLE: + ti = new StructTypeInfo(); + ArrayList<String> names = new ArrayList<String>(); + ArrayList<TypeInfo> typeInfos = new ArrayList<TypeInfo>(); + for (ResourceFieldSchema subFs : fs.getSchema().getFields()) { + TypeInfo info = getTypeInfo(subFs); + names.add(subFs.getName()); + typeInfos.add(info); + } + ((StructTypeInfo)ti).setAllStructFieldNames(names); + ((StructTypeInfo)ti).setAllStructFieldTypeInfos(typeInfos); + break; + case DataType.BAG: + ti = new ListTypeInfo(); + if (fs.getSchema()==null || fs.getSchema().getFields().length!=1) { + throw new IOException("Wrong bag inner schema"); + } + ResourceFieldSchema tupleSchema = fs.getSchema().getFields()[0]; + ResourceFieldSchema itemSchema = tupleSchema; + // If single item tuple, remove the tuple, put the inner item into list directly + if (tupleSchema.getSchema().getFields().length == 1) { + itemSchema = tupleSchema.getSchema().getFields()[0]; + } + TypeInfo elementField = getTypeInfo(itemSchema); + ((ListTypeInfo)ti).setListElementTypeInfo(elementField); + break; + case DataType.MAP: + ti = new MapTypeInfo(); + TypeInfo valueField; + if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) { + valueField = TypeInfoFactory.binaryTypeInfo; + } else { + valueField = getTypeInfo(fs.getSchema().getFields()[0]); + } + ((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo); + ((MapTypeInfo)ti).setMapValueTypeInfo(valueField); + break; + case DataType.BOOLEAN: + ti = TypeInfoFactory.booleanTypeInfo; + break; + case DataType.INTEGER: + ti = TypeInfoFactory.intTypeInfo; + break; + case DataType.LONG: + ti = TypeInfoFactory.longTypeInfo; + break; + case DataType.FLOAT: + ti = TypeInfoFactory.floatTypeInfo; + break; + case DataType.DOUBLE: + ti = TypeInfoFactory.doubleTypeInfo; + break; + case DataType.CHARARRAY: + ti = TypeInfoFactory.stringTypeInfo; + break; + case DataType.DATETIME: + ti = TypeInfoFactory.timestampTypeInfo; + break; + case DataType.BIGDECIMAL: + ti = TypeInfoFactory.decimalTypeInfo; + break; + case DataType.BIGINTEGER: + ti = TypeInfoFactory.decimalTypeInfo; + break; + case DataType.BYTEARRAY: + ti = TypeInfoFactory.binaryTypeInfo; + break; + default: + throw new IllegalArgumentException("Unknown data type " + + DataType.findTypeName(fs.getType())); + } + return ti; + } + + static public class Field implements StructField { + private final String name; + private final ObjectInspector inspector; + private final int offset; + + public Field(String name, ObjectInspector inspector, int offset) { + this.name = name; + this.inspector = inspector; + this.offset = offset; + } + + @Override + public String getFieldName() { + return name; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return offset; + } + + @Override + public String getFieldComment() { + return null; + } + } + + static class PigStructInspector extends StructObjectInspector { + private List<StructField> fields; + + PigStructInspector(StructTypeInfo info) { + ArrayList<String> fieldNames = info.getAllStructFieldNames(); + ArrayList<TypeInfo> fieldTypes = info.getAllStructFieldTypeInfos(); + fields = new ArrayList<StructField>(fieldNames.size()); + for (int i = 0; i < fieldNames.size(); ++i) { + fields.add(new Field(fieldNames.get(i), + createObjectInspector(fieldTypes.get(i)), i)); + } + } + + PigStructInspector(List<StructField> fields) { + this.fields = fields; + } + + @Override + public List<StructField> getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String s) { + for (StructField field : fields) { + if (field.getFieldName().equals(s)) { + return field; + } + } + return null; + } + + @Override + public Object getStructFieldData(Object object, StructField field) { + Object result = null; + try { + result = ((Tuple) object).get(((Field) field).offset); + } catch (ExecException e) { + throw new RuntimeException(e); + } + return result; + } + + @Override + public List<Object> getStructFieldsDataAsList(Object object) { + return ((Tuple) object).getAll(); + } + + @Override + public String getTypeName() { + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + List<StructField> other = ((PigStructInspector) o).fields; + if (other.size() != fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + StructField left = other.get(i); + StructField right = fields.get(i); + if (!(left.getFieldName().equals(right.getFieldName()) && left + .getFieldObjectInspector().equals( + right.getFieldObjectInspector()))) { + return false; + } + } + return true; + } + } + } + + static class PigMapObjectInspector implements MapObjectInspector { + private ObjectInspector key; + private ObjectInspector value; + + PigMapObjectInspector(MapTypeInfo info) { + key = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + value = createObjectInspector(info.getMapValueTypeInfo()); + } + + @Override + public ObjectInspector getMapKeyObjectInspector() { + return key; + } + + @Override + public ObjectInspector getMapValueObjectInspector() { + return value; + } + + @Override + public Object getMapValueElement(Object map, Object key) { + return ((Map) map).get(key); + } + + @Override + public Map<Object, Object> getMap(Object map) { + return (Map) map; + } + + @Override + public int getMapSize(Object map) { + return ((Map) map).size(); + } + + @Override + public String getTypeName() { + return "map<" + key.getTypeName() + "," + value.getTypeName() + ">"; + } + + @Override + public Category getCategory() { + return Category.MAP; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + PigMapObjectInspector other = (PigMapObjectInspector) o; + return other.key.equals(key) && other.value.equals(value); + } + } + } + + static class PigListObjectInspector implements ListObjectInspector { + private ObjectInspector child; + private Object cachedObject; + private int index; + private Iterator<Tuple> iter; + + PigListObjectInspector(ListTypeInfo info) { + child = createObjectInspector(info.getListElementTypeInfo()); + } + + @Override + public ObjectInspector getListElementObjectInspector() { + return child; + } + + @Override + public Object getListElement(Object list, int i) { + if (i==0 || list!=cachedObject) { + cachedObject = list; + index = -1; + DataBag db = (DataBag)list; + iter = db.iterator(); + } + if (i==index+1) { + index++; + try { + Tuple t = iter.next(); + // If single item tuple, take the item directly from list + if (t.size() == 1) { + return t.get(0); + } else { + return t; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Only sequential read is supported"); + } + } + + @Override + public int getListLength(Object list) { + return (int)((DataBag)list).size(); + } + + @Override + @SuppressWarnings("unchecked") + public List<?> getList(Object list) { + List<Object> result = new ArrayList<Object>(); + DataBag bag = (DataBag)list; + for (Tuple t : bag) { + if (t.size() == 1) { + try { + result.add(t.get(0)); + } catch (ExecException e) { + throw new RuntimeException(e); + } + } else { + result.add(t); + } + } + return result; + } + + @Override + public String getTypeName() { + return "array<" + child.getTypeName() + ">"; + } + + @Override + public Category getCategory() { + return Category.LIST; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + ObjectInspector other = ((PigListObjectInspector) o).child; + return other.equals(child); + } + } + } + + static class PigDataByteArrayObjectInspector extends AbstractPrimitiveJavaObjectInspector + implements BinaryObjectInspector { + + PigDataByteArrayObjectInspector() { + super(TypeInfoFactory.binaryTypeInfo); + } + + @Override + public BytesWritable getPrimitiveWritableObject(Object o) { + return o == null ? null : (o instanceof DataByteArray + ? new BytesWritable(((DataByteArray) o).get()) + : new BytesWritable((byte[]) o)); + } + + @Override + public byte[] getPrimitiveJavaObject(Object o) { + return ((DataByteArray) o).get(); + } + + } + + static class PigJodaTimeStampObjectInspector extends + AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector { + + protected PigJodaTimeStampObjectInspector() { + super(TypeInfoFactory.timestampTypeInfo); + } + + @Override + public TimestampWritable getPrimitiveWritableObject(Object o) { + return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis())); + } + + @Override + public Timestamp getPrimitiveJavaObject(Object o) { + return o == null ? null : new Timestamp(((DateTime)o).getMillis()); + } + } + + static class PigDecimalObjectInspector extends + AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector { + + protected PigDecimalObjectInspector() { + super(TypeInfoFactory.decimalTypeInfo); + } + + @Override + public HiveDecimalWritable getPrimitiveWritableObject(Object o) { + if (o instanceof BigDecimal) { + return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigDecimal)o)); + } else { // BigInteger + return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigInteger)o)); + } + } + + @Override + public HiveDecimal getPrimitiveJavaObject(Object o) { + if (o instanceof BigDecimal) { + return o == null ? null : HiveDecimal.create((BigDecimal)o); + } else { // BigInteger + return o == null ? null : HiveDecimal.create((BigInteger)o); + } + } + } + + public static ObjectInspector createObjectInspector(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: + switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { + case FLOAT: + return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + case DOUBLE: + return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + case BOOLEAN: + return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; + case INT: + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + case LONG: + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + case STRING: + return PrimitiveObjectInspectorFactory.javaStringObjectInspector; + case TIMESTAMP: + return new PigJodaTimeStampObjectInspector(); + case DECIMAL: + return new PigDecimalObjectInspector(); + case BINARY: + return new PigDataByteArrayObjectInspector(); + case DATE: + case VARCHAR: + case BYTE: + case SHORT: + throw new IllegalArgumentException("Should never happen, " + + (((PrimitiveTypeInfo) info).getPrimitiveCategory()) + + "is not valid Pig primitive data type"); + default: + throw new IllegalArgumentException("Unknown primitive type " + + ((PrimitiveTypeInfo) info).getPrimitiveCategory()); + } + case STRUCT: + return new PigStructInspector((StructTypeInfo) info); + case MAP: + return new PigMapObjectInspector((MapTypeInfo) info); + case LIST: + return new PigListObjectInspector((ListTypeInfo) info); + default: + throw new IllegalArgumentException("Unknown type " + + info.getCategory()); + } + } + + public static ConstantObjectInspector getConstantObjectInspector(Object obj) { + switch (DataType.findType(obj)) { + case DataType.FLOAT: + return new JavaConstantFloatObjectInspector((Float)obj); + case DataType.DOUBLE: + return new JavaConstantDoubleObjectInspector((Double)obj); + case DataType.BOOLEAN: + return new JavaConstantBooleanObjectInspector((Boolean)obj); + case DataType.INTEGER: + return new JavaConstantIntObjectInspector((Integer)obj); + case DataType.LONG: + return new JavaConstantLongObjectInspector((Long)obj); + case DataType.CHARARRAY: + return new JavaConstantStringObjectInspector((String)obj); + default: + throw new IllegalArgumentException("Not implemented " + obj.getClass().getName()); + } + } +}
Modified: pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/orc/OrcUtils.java Tue Apr 7 21:24:48 2015 @@ -1,697 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.impl.util.orc; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.BytesWritable; -import org.apache.pig.PigWarning; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.tools.pigstats.PigStatusReporter; -import org.joda.time.DateTime; - -public class OrcUtils { - public static Object convertOrcToPig(Object obj, ObjectInspector oi, boolean[] includedColumns) { - Object result = null; - if (obj == null) { - return result; - } - switch (oi.getCategory()) { - case PRIMITIVE: - PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi; - result = getPrimaryFromOrc(obj, poi); - break; - case STRUCT: - StructObjectInspector soi = (StructObjectInspector)oi; - List<StructField> elementFields = (List<StructField>) soi.getAllStructFieldRefs(); - List<Object> items = soi.getStructFieldsDataAsList(obj); - Tuple t = TupleFactory.getInstance().newTuple(); - for (int i=0;i<items.size();i++) { - if (includedColumns==null || includedColumns[i]) { - Object convertedItem = convertOrcToPig(items.get(i), elementFields.get(i).getFieldObjectInspector(), null); - t.append(convertedItem); - } - } - result = t; - break; - case MAP: - MapObjectInspector moi = (MapObjectInspector)oi; - ObjectInspector keyObjectInspector = moi.getMapKeyObjectInspector(); - ObjectInspector valueObjectInspector = moi.getMapValueObjectInspector(); - Map<Object, Object> m = (Map<Object, Object>)obj; - result = new HashMap(); - for (Map.Entry<Object, Object> entry : m.entrySet()) { - Object convertedKey = convertOrcToPig(entry.getKey(), keyObjectInspector, null); - Object convertedValue = convertOrcToPig(entry.getValue(), valueObjectInspector, null); - if (convertedKey!=null) { - ((Map)result).put(convertedKey.toString(), convertedValue); - } else { - PigStatusReporter reporter = PigStatusReporter.getInstance(); - if (reporter != null) { - reporter.incrCounter(PigWarning.UDF_WARNING_1, 1); - } - } - } - break; - case LIST: - ListObjectInspector loi = (ListObjectInspector)oi; - result = BagFactory.getInstance().newDefaultBag(); - ObjectInspector itemObjectInspector = loi.getListElementObjectInspector(); - for (Object item : loi.getList(obj)) { - Tuple convertedItem = (Tuple)convertOrcToPig(item, itemObjectInspector, null); - ((DataBag)result).add(convertedItem); - } - break; - default: - throw new IllegalArgumentException("Unknown type " + - oi.getCategory()); - } - return result; - } - - public static Object getPrimaryFromOrc(Object obj, PrimitiveObjectInspector poi) { - Object result = null; - if (obj == null) { - return result; - } - switch (poi.getPrimitiveCategory()) { - case FLOAT: - case DOUBLE: - case BOOLEAN: - case INT: - case LONG: - case STRING: - result = poi.getPrimitiveJavaObject(obj); - break; - case CHAR: - result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue(); - break; - case VARCHAR: - result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue(); - break; - case BYTE: - result = (int)(Byte)poi.getPrimitiveJavaObject(obj); - break; - case SHORT: - result = (int)(Short)poi.getPrimitiveJavaObject(obj); - break; - case BINARY: - BytesWritable bw = (BytesWritable) obj; - // Make a copy - result = new DataByteArray(bw.getBytes(), 0, bw.getLength()); - break; - case TIMESTAMP: - java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj); - result = new DateTime(origTimeStamp.getTime()); - break; - case DATE: - java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj); - result = new DateTime(origDate.getTime()); - break; - case DECIMAL: - org.apache.hadoop.hive.common.type.HiveDecimal origDecimal = - (org.apache.hadoop.hive.common.type.HiveDecimal)poi.getPrimitiveJavaObject(obj); - result = origDecimal.bigDecimalValue(); - break; - default: - throw new IllegalArgumentException("Unknown primitive type " + - (poi).getPrimitiveCategory()); - } - return result; - } - - public static ResourceFieldSchema getResourceFieldSchema(TypeInfo ti) throws IOException { - ResourceFieldSchema fieldSchema = new ResourceFieldSchema(); - ResourceFieldSchema[] innerFs; - ResourceSchema innerSchema; - switch (ti.getCategory()) { - case STRUCT: - StructTypeInfo sti = (StructTypeInfo)ti; - fieldSchema.setType(DataType.TUPLE); - List<TypeInfo> typeInfos = sti.getAllStructFieldTypeInfos(); - List<String> names = sti.getAllStructFieldNames(); - innerFs = new ResourceFieldSchema[typeInfos.size()]; - for (int i=0;i<typeInfos.size();i++) { - innerFs[i] = getResourceFieldSchema(typeInfos.get(i)); - innerFs[i].setName(names.get(i)); - } - innerSchema = new ResourceSchema(); - innerSchema.setFields(innerFs); - fieldSchema.setSchema(innerSchema); - break; - case LIST: - ListTypeInfo lti = (ListTypeInfo)ti; - fieldSchema.setType(DataType.BAG); - innerFs = new ResourceFieldSchema[1]; - innerFs[0] = getResourceFieldSchema(lti.getListElementTypeInfo()); - innerSchema = new ResourceSchema(); - innerSchema.setFields(innerFs); - fieldSchema.setSchema(innerSchema); - break; - case MAP: - MapTypeInfo mti = (MapTypeInfo)ti; - fieldSchema.setType(DataType.MAP); - innerFs = new ResourceFieldSchema[1]; - innerFs[0] = getResourceFieldSchema(mti.getMapValueTypeInfo()); - innerSchema = new ResourceSchema(); - innerSchema.setFields(innerFs); - fieldSchema.setSchema(innerSchema); - break; - case PRIMITIVE: - switch (((PrimitiveTypeInfo)ti).getPrimitiveCategory()) { - case FLOAT: - fieldSchema.setType(DataType.FLOAT); - break; - case DOUBLE: - fieldSchema.setType(DataType.DOUBLE); - break; - case BOOLEAN: - fieldSchema.setType(DataType.BOOLEAN); - break; - case BYTE: - fieldSchema.setType(DataType.INTEGER); - break; - case SHORT: - fieldSchema.setType(DataType.INTEGER); - break; - case INT: - fieldSchema.setType(DataType.INTEGER); - break; - case LONG: - fieldSchema.setType(DataType.LONG); - break; - case BINARY: - fieldSchema.setType(DataType.BYTEARRAY); - break; - case STRING: - fieldSchema.setType(DataType.CHARARRAY); - break; - case VARCHAR: - fieldSchema.setType(DataType.CHARARRAY); - break; - case CHAR: - fieldSchema.setType(DataType.CHARARRAY); - break; - case TIMESTAMP: - fieldSchema.setType(DataType.DATETIME); - break; - case DATE: - fieldSchema.setType(DataType.DATETIME); - break; - case DECIMAL: - fieldSchema.setType(DataType.BIGDECIMAL); - break; - default: - throw new IllegalArgumentException("Unknown primitive type " + - ((PrimitiveTypeInfo)ti).getPrimitiveCategory()); - } - break; - } - - return fieldSchema; - } - - public static TypeInfo getTypeInfo(ResourceFieldSchema fs) throws IOException { - TypeInfo ti; - switch (fs.getType()) { - case DataType.TUPLE: - ti = new StructTypeInfo(); - ArrayList<String> names = new ArrayList<String>(); - ArrayList<TypeInfo> typeInfos = new ArrayList<TypeInfo>(); - for (ResourceFieldSchema subFs : fs.getSchema().getFields()) { - TypeInfo info = getTypeInfo(subFs); - names.add(subFs.getName()); - typeInfos.add(info); - } - ((StructTypeInfo)ti).setAllStructFieldNames(names); - ((StructTypeInfo)ti).setAllStructFieldTypeInfos(typeInfos); - break; - case DataType.BAG: - ti = new ListTypeInfo(); - if (fs.getSchema()==null || fs.getSchema().getFields().length!=1) { - throw new IOException("Wrong bag inner schema"); - } - TypeInfo elementField = getTypeInfo(fs.getSchema().getFields()[0]); - ((ListTypeInfo)ti).setListElementTypeInfo(elementField); - break; - case DataType.MAP: - ti = new MapTypeInfo(); - TypeInfo valueField; - if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) { - valueField = TypeInfoFactory.binaryTypeInfo; - } else { - valueField = getTypeInfo(fs.getSchema().getFields()[0]); - } - ((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo); - ((MapTypeInfo)ti).setMapValueTypeInfo(valueField); - break; - case DataType.BOOLEAN: - ti = TypeInfoFactory.booleanTypeInfo; - break; - case DataType.INTEGER: - ti = TypeInfoFactory.intTypeInfo; - break; - case DataType.LONG: - ti = TypeInfoFactory.longTypeInfo; - break; - case DataType.FLOAT: - ti = TypeInfoFactory.floatTypeInfo; - break; - case DataType.DOUBLE: - ti = TypeInfoFactory.doubleTypeInfo; - break; - case DataType.CHARARRAY: - ti = TypeInfoFactory.stringTypeInfo; - break; - case DataType.DATETIME: - ti = TypeInfoFactory.timestampTypeInfo; - break; - case DataType.BIGDECIMAL: - ti = TypeInfoFactory.decimalTypeInfo; - break; - case DataType.BIGINTEGER: - ti = TypeInfoFactory.decimalTypeInfo; - break; - case DataType.BYTEARRAY: - ti = TypeInfoFactory.binaryTypeInfo; - break; - default: - throw new IllegalArgumentException("Unknown data type " + - DataType.findTypeName(fs.getType())); - } - return ti; - } - - static class Field implements StructField { - private final String name; - private final ObjectInspector inspector; - private final int offset; - - Field(String name, ObjectInspector inspector, int offset) { - this.name = name; - this.inspector = inspector; - this.offset = offset; - } - - @Override - public String getFieldName() { - return name; - } - - @Override - public ObjectInspector getFieldObjectInspector() { - return inspector; - } - - @Override - public int getFieldID() { - return offset; - } - - @Override - public String getFieldComment() { - return null; - } - } - - static class PigStructInspector extends StructObjectInspector { - private List<StructField> fields; - - PigStructInspector(StructTypeInfo info) { - ArrayList<String> fieldNames = info.getAllStructFieldNames(); - ArrayList<TypeInfo> fieldTypes = info.getAllStructFieldTypeInfos(); - fields = new ArrayList<StructField>(fieldNames.size()); - for (int i = 0; i < fieldNames.size(); ++i) { - fields.add(new Field(fieldNames.get(i), - createObjectInspector(fieldTypes.get(i)), i)); - } - } - - @Override - public List<StructField> getAllStructFieldRefs() { - return fields; - } - - @Override - public StructField getStructFieldRef(String s) { - for (StructField field : fields) { - if (field.getFieldName().equals(s)) { - return field; - } - } - return null; - } - - @Override - public Object getStructFieldData(Object object, StructField field) { - Object result = null; - try { - result = ((Tuple) object).get(((Field) field).offset); - } catch (ExecException e) { - throw new RuntimeException(e); - } - return result; - } - - @Override - public List<Object> getStructFieldsDataAsList(Object object) { - return ((Tuple) object).getAll(); - } - - @Override - public String getTypeName() { - StringBuilder buffer = new StringBuilder(); - buffer.append("struct<"); - for (int i = 0; i < fields.size(); ++i) { - StructField field = fields.get(i); - if (i != 0) { - buffer.append(","); - } - buffer.append(field.getFieldName()); - buffer.append(":"); - buffer.append(field.getFieldObjectInspector().getTypeName()); - } - buffer.append(">"); - return buffer.toString(); - } - - @Override - public Category getCategory() { - return Category.STRUCT; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - List<StructField> other = ((PigStructInspector) o).fields; - if (other.size() != fields.size()) { - return false; - } - for (int i = 0; i < fields.size(); ++i) { - StructField left = other.get(i); - StructField right = fields.get(i); - if (!(left.getFieldName().equals(right.getFieldName()) && left - .getFieldObjectInspector().equals( - right.getFieldObjectInspector()))) { - return false; - } - } - return true; - } - } - } - - static class PigMapObjectInspector implements MapObjectInspector { - private ObjectInspector key; - private ObjectInspector value; - - PigMapObjectInspector(MapTypeInfo info) { - key = createObjectInspector(info.getMapKeyTypeInfo()); - value = createObjectInspector(info.getMapValueTypeInfo()); - } - - @Override - public ObjectInspector getMapKeyObjectInspector() { - return key; - } - - @Override - public ObjectInspector getMapValueObjectInspector() { - return value; - } - - @Override - public Object getMapValueElement(Object map, Object key) { - return ((Map) map).get(key); - } - - @Override - public Map<Object, Object> getMap(Object map) { - return (Map) map; - } - - @Override - public int getMapSize(Object map) { - return ((Map) map).size(); - } - - @Override - public String getTypeName() { - return "map<" + key.getTypeName() + "," + value.getTypeName() + ">"; - } - - @Override - public Category getCategory() { - return Category.MAP; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - PigMapObjectInspector other = (PigMapObjectInspector) o; - return other.key.equals(key) && other.value.equals(value); - } - } - } - - static class PigListObjectInspector implements ListObjectInspector { - private ObjectInspector child; - private Object cachedObject; - private int index; - private Iterator<Tuple> iter; - - PigListObjectInspector(ListTypeInfo info) { - child = createObjectInspector(info.getListElementTypeInfo()); - } - - @Override - public ObjectInspector getListElementObjectInspector() { - return child; - } - - @Override - public Object getListElement(Object list, int i) { - if (list!=cachedObject) { - cachedObject = list; - index = -1; - DataBag db = (DataBag)list; - iter = db.iterator(); - } - if (i==index+1) { - index++; - return iter.next(); - } else { - throw new RuntimeException("Only sequential read is supported"); - } - } - - @Override - public int getListLength(Object list) { - return (int)((DataBag)list).size(); - } - - @Override - @SuppressWarnings("unchecked") - public List<?> getList(Object list) { - List<Object> result = new ArrayList<Object>(); - DataBag bag = (DataBag)list; - for (Tuple t : bag) { - result.add(t); - } - return result; - } - - @Override - public String getTypeName() { - return "array<" + child.getTypeName() + ">"; - } - - @Override - public Category getCategory() { - return Category.LIST; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - ObjectInspector other = ((PigListObjectInspector) o).child; - return other.equals(child); - } - } - } - - static class PigDataByteArrayObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements BinaryObjectInspector { - - PigDataByteArrayObjectInspector() { - super(TypeInfoFactory.binaryTypeInfo); - } - - @Override - public BytesWritable getPrimitiveWritableObject(Object o) { - return o == null ? null : (o instanceof DataByteArray - ? new BytesWritable(((DataByteArray) o).get()) - : new BytesWritable((byte[]) o)); - } - - @Override - public byte[] getPrimitiveJavaObject(Object o) { - return ((DataByteArray) o).get(); - } - - } - - static class PigJodaTimeStampObjectInspector extends - AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector { - - protected PigJodaTimeStampObjectInspector() { - super(TypeInfoFactory.timestampTypeInfo); - } - - @Override - public TimestampWritable getPrimitiveWritableObject(Object o) { - return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis())); - } - - @Override - public Timestamp getPrimitiveJavaObject(Object o) { - return o == null ? null : new Timestamp(((DateTime)o).getMillis()); - } - } - - static class PigDecimalObjectInspector extends - AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector { - - protected PigDecimalObjectInspector() { - super(TypeInfoFactory.decimalTypeInfo); - } - - @Override - public HiveDecimalWritable getPrimitiveWritableObject(Object o) { - if (o instanceof BigDecimal) { - return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigDecimal)o)); - } else { // BigInteger - return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigInteger)o)); - } - } - - @Override - public HiveDecimal getPrimitiveJavaObject(Object o) { - if (o instanceof BigDecimal) { - return o == null ? null : HiveDecimal.create((BigDecimal)o); - } else { // BigInteger - return o == null ? null : HiveDecimal.create((BigInteger)o); - } - } - } - - public static ObjectInspector createObjectInspector(TypeInfo info) { - switch (info.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { - case FLOAT: - return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; - case DOUBLE: - return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; - case BOOLEAN: - return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; - case INT: - return PrimitiveObjectInspectorFactory.javaIntObjectInspector; - case LONG: - return PrimitiveObjectInspectorFactory.javaLongObjectInspector; - case STRING: - return PrimitiveObjectInspectorFactory.javaStringObjectInspector; - case TIMESTAMP: - return new PigJodaTimeStampObjectInspector(); - case DECIMAL: - return new PigDecimalObjectInspector(); - case BINARY: - return new PigDataByteArrayObjectInspector(); - case DATE: - case VARCHAR: - case BYTE: - case SHORT: - throw new IllegalArgumentException("Should never happen, " + - (((PrimitiveTypeInfo) info).getPrimitiveCategory()) + - "is not valid Pig primitive data type"); - default: - throw new IllegalArgumentException("Unknown primitive type " + - ((PrimitiveTypeInfo) info).getPrimitiveCategory()); - } - case STRUCT: - return new PigStructInspector((StructTypeInfo) info); - case MAP: - return new PigMapObjectInspector((MapTypeInfo) info); - case LIST: - return new PigListObjectInspector((ListTypeInfo) info); - default: - throw new IllegalArgumentException("Unknown type " + - info.getCategory()); - } - } -} Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Apr 7 21:24:48 2015 @@ -527,11 +527,16 @@ public class ExpToPhyTranslationVisitor //reinitialize input schema from signature if (((POUserFunc)p).getFunc().getInputSchema() == null) { ((POUserFunc)p).setFuncInputSchema(op.getSignature()); + ((EvalFunc) f).setInputSchema(((POUserFunc)p).getFunc().getInputSchema()); } List<String> cacheFiles = ((EvalFunc)f).getCacheFiles(); if (cacheFiles != null) { ((POUserFunc)p).setCacheFiles(cacheFiles); } + List<String> shipFiles = ((EvalFunc)f).getShipFiles(); + if (shipFiles != null) { + ((POUserFunc)p).setShipFiles(shipFiles); + } } else { p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Tue Apr 7 21:24:48 2015 @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.builtin.InvokerGenerator; @@ -241,10 +242,20 @@ public class UserFuncExpression extends } ef.setUDFContextSignature(signature); - Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); Schema translatedInputSchema = Util.translateSchema(inputSchema); if(translatedInputSchema != null) { + Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + if (ef instanceof Algebraic) { + // In case of Algebraic func, set original inputSchema to Initial, + // Intermed, Final + for (String func : new String[]{((Algebraic)ef).getInitial(), + ((Algebraic)ef).getIntermed(), ((Algebraic)ef).getFinal()}) { + Class c = PigContext.instantiateFuncFromSpec(new FuncSpec(func)).getClass(); + props = UDFContext.getUDFContext().getUDFProperties(c); + props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + } + } } // Store inputSchema into the UDF context ef.setInputSchema(translatedInputSchema); Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Apr 7 21:24:48 2015 @@ -422,7 +422,7 @@ public class LogToPhyTranslationVisitor poSort = new POSort(new OperatorKey(scope, nodeGen .getNextNodeId(scope)), -1, null, newPhysicalPlan, newOrderPlan, null); - poSort.setRequestedParallelism(loRank.getRequestedParallelism()); + //poSort.setRequestedParallelism(loRank.getRequestedParallelism()); poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation()); Modified: pig/trunk/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/build.xml (original) +++ pig/trunk/test/e2e/pig/build.xml Tue Apr 7 21:24:48 2015 @@ -31,6 +31,8 @@ <equals arg1="${hadoopversion}" arg2="23" /> </condition> + <property name="mvnrepo" value="http://repo2.maven.org/maven2"/> + <!-- Separate property name for udfs' build.xml --> <property name="udf.dir" value="${basedir}/udfs"/> <property name="udf.java.dir" value="${udf.dir}/java"/> @@ -155,7 +157,7 @@ </target> <!-- Build an archive to use in the tests --> - <target name="tar" description="Create tar file with pig modules"> + <target name="tar" description="Create tar file with pig modules" depends="download-datafu"> <mkdir dir="${tar.dir}"/> <mkdir dir="${tar.dir}/tests"/> <mkdir dir="${tar.dir}/drivers"/> @@ -408,6 +410,12 @@ <ant dir="${udf.java.dir}" target="clean"/> </target> + <target name="download-datafu" description="To download datafu" unless="offline"> + <mkdir dir="lib/java"/> + <get src="${mvnrepo}/com/linkedin/datafu/datafu/1.2.0/datafu-1.2.0.jar" + dest="lib/java/datafu.jar"/> + </target> + </project> Modified: pig/trunk/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1671956&r1=1671955&r2=1671956&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/nightly.conf (original) +++ pig/trunk/test/e2e/pig/tests/nightly.conf Tue Apr 7 21:24:48 2015 @@ -5621,6 +5621,124 @@ store a into ':OUTPATH:';\, \, } ] + }, + { + 'name' => 'HiveUDF', + 'tests' => [ + { + # HiveUDF extends UDF + 'num' => 1, + 'pig' => q\ + define sin HiveUDF('sin'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate sin(gpa); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate SIN(gpa); + store B into ':OUTPATH:';\, + }, + { + # HiveUDF extends GenericUDF + 'num' => 2, + 'pig' => q\ + define upper HiveUDF('upper'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate upper(name); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate UPPER(name); + store B into ':OUTPATH:';\, + }, + { + # HiveUDTF + 'num' => 3, + 'pig' => q\ + define explode HiveUDTF('explode'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(explode(b)); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(b); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends GenericUDAF, with null handling + 'num' => 4, + 'pig' => q\ + define avg HiveUDAF('avg'); + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, avg(A.age); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, AVG(A.age); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends UDAF + 'num' => 5, + 'pig' => q\ + define percentile HiveUDAF('percentile'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate name, age, 0.5 as perc; + C = group B by name; + D = foreach C generate group, percentile(B.(age, perc)); + store D into ':OUTPATH:';\, + 'verify_pig_script' => q\ + register :FUNCPATH:/datafu.jar + define Quartile datafu.pig.stats.Quantile('0.5'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = group A by name; + C = foreach B { + sorted = order A by age; + generate group, flatten(Quartile(sorted.age)); + } + store C into ':OUTPATH:';\, + }, + { + # Constant folding and ship jars + 'num' => 6, + 'pig' => q# + sh echo -e "zach young\nzach zipper" > names.txt + define in_file HiveUDF('in_file', '(null, "names.txt")'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate in_file(name, 'names.txt'); + store B into ':OUTPATH:';#, + 'verify_pig_script' => q#register :PIGGYBANKJAR: + sh echo -e "zach young\nzach zipper" > names.txt + rmf :INPATH:/singlefile/names.txt + fs -put names.txt :INPATH:/singlefile/names.txt + define LookupInFiles org.apache.pig.piggybank.evaluation.string.LookupInFiles(); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate LookupInFiles(name, ':INPATH:/singlefile/names.txt'); + C = foreach B generate (boolean)$0; + store C into ':OUTPATH:'; + fs -rm :INPATH:/singlefile/names.txt# + }, + { + # Custom Hive UDF and MapredContext + 'num' => 7, + 'pig' => q\set mapred.max.split.size '100000000' + register :FUNCPATH:/testudf.jar; + define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate DummyContextUDF(); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\set mapred.max.split.size '100000000' + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate UniqueID(); + C = foreach B generate (int)SUBSTRING($0, 2, 100); + D = foreach C generate (chararray)($0+1); + store D into ':OUTPATH:';\ + } + ] } ], }, Added: pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java?rev=1671956&view=auto ============================================================================== --- pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java (added) +++ pig/trunk/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java Tue Apr 7 21:24:48 2015 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.udf.evalfunc; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Reporter; +import org.apache.pig.impl.util.UDFContext; + +@Description(name = "dummycontextudf", +value = "_FUNC_(col) - UDF to report MR counter values") +public class DummyContextUDF extends GenericUDF { + + private MapredContext context; + private LongWritable result = new LongWritable(); + + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } + + public Object evaluate(DeferredObject[] arguments) throws HiveException { + Reporter reporter = context.getReporter(); + Counters.Counter counter; + if (UDFContext.getUDFContext().getJobConf().get("exectype").equals("TEZ")) { + counter = reporter.getCounter("org.apache.tez.common.counters.TaskCounter", "INPUT_RECORDS_PROCESSED"); + } else { + counter = reporter.getCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS"); + } + result.set(counter.getValue()); + return result; + } + + public String getDisplayString(String[] children) { + return "dummy-func()"; + } + + @Override + public void configure(MapredContext context) { + this.context = context; + } +} +
