Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff ============================================================================== --- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (original) +++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java Mon Sep 10 23:28:55 2012 @@ -56,388 +56,388 @@ import org.apache.pig.impl.util.UDFConte public class PigHCatUtil { - static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes - private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME; + static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes + private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME; - private final Map<Pair<String,String>, Table> hcatTableCache = - new HashMap<Pair<String,String>, Table>(); + private final Map<Pair<String, String>, Table> hcatTableCache = + new HashMap<Pair<String, String>, Table>(); - private static final TupleFactory tupFac = TupleFactory.getInstance(); + private static final TupleFactory tupFac = TupleFactory.getInstance(); - static public Pair<String, String> getDBTableNames(String location) throws IOException { - // the location string will be of the form: - // <database name>.<table name> - parse it and - // communicate the information to HCatInputFormat - - try { - return HCatUtil.getDbAndTableName(location); - } catch (IOException e) { - String locationErrMsg = "The input location in load statement " + - "should be of the form " + - "<databasename>.<table name> or <table name>. Got " + location; - throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE); - } - } - - static public String getHCatServerUri(Job job) { - - return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname); - } - - static public String getHCatServerPrincipal(Job job) { - - return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL); - } - - private static HiveMetaStoreClient getHiveMetaClient(String serverUri, - String serverKerberosPrincipal, Class<?> clazz) throws Exception { - HiveConf hiveConf = new HiveConf(clazz); - - if (serverUri != null){ - hiveConf.set("hive.metastore.local", "false"); - hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim()); - } - - if (serverKerberosPrincipal != null){ - hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); - hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); - } - - try { - return HCatUtil.getHiveClient(hiveConf); - } catch (Exception e){ - throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:["+serverUri+"]",e); - } - } - - - HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException { - if(fields == null) { - return null; - } - - Properties props = UDFContext.getUDFContext().getUDFProperties( - classForUDFCLookup, new String[] {signature}); - HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA); - - ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>(); - for(RequiredField rf: fields) { - fcols.add(hcatTableSchema.getFields().get(rf.getIndex())); - } - return new HCatSchema(fcols); - } - - public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException{ - Pair<String, String> loc_server = new Pair<String,String>(location, hcatServerUri); - Table hcatTable = hcatTableCache.get(loc_server); - if(hcatTable != null){ - return hcatTable; - } - - Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location); - String dbName = dbTablePair.first; - String tableName = dbTablePair.second; - Table table = null; - HiveMetaStoreClient client = null; - try { - client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); - table = HCatUtil.getTable(client, dbName, tableName); - } catch (NoSuchObjectException nsoe){ - throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend - } catch (Exception e) { - throw new IOException(e); - } finally { - HCatUtil.closeHiveClientQuietly(client); - } - hcatTableCache.put(loc_server, table); - return table; - } - - public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException { - - List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>(); - for (HCatFieldSchema hfs : hcatSchema.getFields()){ - ResourceFieldSchema rfSchema; - rfSchema = getResourceSchemaFromFieldSchema(hfs); - rfSchemaList.add(rfSchema); - } - ResourceSchema rSchema = new ResourceSchema(); - rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0])); - return rSchema; - - } - - private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs) - throws IOException { - ResourceFieldSchema rfSchema; - // if we are dealing with a bag or tuple column - need to worry about subschema - if(hfs.getType() == Type.STRUCT) { - rfSchema = new ResourceFieldSchema() - .setName(hfs.getName()) - .setDescription(hfs.getComment()) - .setType(getPigType( hfs)) - .setSchema(getTupleSubSchema(hfs)); - } else if(hfs.getType() == Type.ARRAY) { - rfSchema = new ResourceFieldSchema() - .setName(hfs.getName()) - .setDescription(hfs.getComment()) - .setType(getPigType( hfs)) - .setSchema(getBagSubSchema(hfs)); - } else { - rfSchema = new ResourceFieldSchema() - .setName(hfs.getName()) - .setDescription(hfs.getComment()) - .setType(getPigType( hfs)) - .setSchema(null); // no munging inner-schemas - } - return rfSchema; - } - - protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException { - // there are two cases - array<Type> and array<struct<...>> - // in either case the element type of the array is represented in a - // tuple field schema in the bag's field schema - the second case (struct) - // more naturally translates to the tuple - in the first case (array<Type>) - // we simulate the tuple by putting the single field in a tuple - - Properties props = UDFContext.getUDFContext().getClientSystemProps(); - String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT; - if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) { - innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME) - .replaceAll("FIELDNAME", hfs.getName()); - } - String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT; - if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) { - innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME) - .replaceAll("FIELDNAME", hfs.getName()); - } - - ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1]; - bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName) - .setDescription("The tuple in the bag") - .setType(DataType.TUPLE); - HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0); - if(arrayElementFieldSchema.getType() == Type.STRUCT) { - bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema)); - } else if(arrayElementFieldSchema.getType() == Type.ARRAY) { - ResourceSchema s = new ResourceSchema(); - List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema)); - s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); - bagSubFieldSchemas[0].setSchema(s); - } else { - ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1]; - innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName) - .setDescription("The inner field in the tuple in the bag") - .setType(getPigType(arrayElementFieldSchema)) - .setSchema(null); // the element type is not a tuple - so no subschema - bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas)); - } - ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas); - return s; - - } - - private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException { - // for each struct subfield, create equivalent ResourceFieldSchema - ResourceSchema s = new ResourceSchema(); - List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>(); - for(HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) { - lrfs.add(getResourceSchemaFromFieldSchema(subField)); - } - s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); - return s; - } - -/** - * @param hfs the field schema of the column - * @return corresponding pig type - * @throws IOException - */ - static public byte getPigType(HCatFieldSchema hfs) throws IOException { - return getPigType(hfs.getType()); - } - - static public byte getPigType(Type type) throws IOException { - String errMsg; - - if (type == Type.STRING){ - return DataType.CHARARRAY; - } - - if ( (type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)){ - return DataType.INTEGER; - } - - if (type == Type.ARRAY){ - return DataType.BAG; - } - - if (type == Type.STRUCT){ - return DataType.TUPLE; - } - - if (type == Type.MAP){ - return DataType.MAP; - } - - if (type == Type.BIGINT){ - return DataType.LONG; - } - - if (type == Type.FLOAT){ - return DataType.FLOAT; - } - - if (type == Type.DOUBLE){ - return DataType.DOUBLE; - } - - if (type == Type.BINARY){ - return DataType.BYTEARRAY; - } - - if (type == Type.BOOLEAN){ - errMsg = "HCatalog column type 'BOOLEAN' is not supported in " + - "Pig as a column type"; - throw new PigException(errMsg, PIG_EXCEPTION_CODE); - } - - errMsg = "HCatalog column type '"+ type.toString() +"' is not supported in Pig as a column type"; - throw new PigException(errMsg, PIG_EXCEPTION_CODE); - } - - public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception { - if (hr == null){ - return null; - } - return transformToTuple(hr.getAll(),hs); - } - - @SuppressWarnings("unchecked") - public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception { - Object result; - Type itemType = hfs.getType(); - switch (itemType){ - case BINARY: - result = (o == null) ? null : new DataByteArray((byte[])o); - break; - case STRUCT: - result = transformToTuple((List<Object>)o,hfs); - break; - case ARRAY: - result = transformToBag((List<? extends Object>) o,hfs); - break; - case MAP: - result = transformToPigMap((Map<String, Object>)o,hfs); - break; - default: - result = o; - break; - } - return result; - } - - public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception { - try { - return transformToTuple(objList,hfs.getStructSubSchema()); - } catch (Exception e){ - if (hfs.getType() != Type.STRUCT){ - throw new Exception("Expected Struct type, got "+hfs.getType(), e); - } else { - throw e; - } - } - } - - public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception { - if (objList == null){ - return null; - } - Tuple t = tupFac.newTuple(objList.size()); - List<HCatFieldSchema> subFields = hs.getFields(); - for (int i = 0; i < subFields.size(); i++){ - t.set(i,extractPigObject(objList.get(i), subFields.get(i))); - } - return t; - } - - public static Map<String,Object> transformToPigMap(Map<String,Object> map, HCatFieldSchema hfs) throws Exception { - if (map == null) { - return null; - } - - Map<String,Object> result = new HashMap<String, Object>(); - for (Entry<String, Object> entry : map.entrySet()) { - result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0))); - } - return result; - } - - @SuppressWarnings("unchecked") - public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception { - if (list == null){ - return null; - } - - HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0); - DataBag db = new DefaultDataBag(); - for (Object o : list){ - Tuple tuple; - if (elementSubFieldSchema.getType() == Type.STRUCT){ - tuple = transformToTuple((List<Object>)o, elementSubFieldSchema); - } else { - // bags always contain tuples - tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema)); - } - db.add(tuple); - } - return db; - } - - - private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException { - for(HCatFieldSchema hcatField : tblSchema.getFields()){ - validateHcatFieldFollowsPigRules(hcatField); - } - } - - private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException { - try { - Type hType = hcatField.getType(); - switch(hType){ - case BOOLEAN: - throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); - case ARRAY: - validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema()); - break; - case STRUCT: - validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema()); - break; - case MAP: - // key is only string - validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema()); - break; - } - } catch (HCatException e) { - throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e); - } - } - - - public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { - validateHCatSchemaFollowsPigRules(hcatTableSchema); - } - - public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { - if(p.getProperty(propName) != null){ - config.set(propName, p.getProperty(propName)); - } - } - - public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) { - if(config.get(propName) != null){ - p.setProperty(propName, config.get(propName)); + static public Pair<String, String> getDBTableNames(String location) throws IOException { + // the location string will be of the form: + // <database name>.<table name> - parse it and + // communicate the information to HCatInputFormat + + try { + return HCatUtil.getDbAndTableName(location); + } catch (IOException e) { + String locationErrMsg = "The input location in load statement " + + "should be of the form " + + "<databasename>.<table name> or <table name>. Got " + location; + throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE); + } + } + + static public String getHCatServerUri(Job job) { + + return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname); + } + + static public String getHCatServerPrincipal(Job job) { + + return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL); + } + + private static HiveMetaStoreClient getHiveMetaClient(String serverUri, + String serverKerberosPrincipal, Class<?> clazz) throws Exception { + HiveConf hiveConf = new HiveConf(clazz); + + if (serverUri != null) { + hiveConf.set("hive.metastore.local", "false"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim()); + } + + if (serverKerberosPrincipal != null) { + hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true); + hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal); + } + + try { + return HCatUtil.getHiveClient(hiveConf); + } catch (Exception e) { + throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:[" + serverUri + "]", e); + } + } + + + HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException { + if (fields == null) { + return null; + } + + Properties props = UDFContext.getUDFContext().getUDFProperties( + classForUDFCLookup, new String[]{signature}); + HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA); + + ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>(); + for (RequiredField rf : fields) { + fcols.add(hcatTableSchema.getFields().get(rf.getIndex())); + } + return new HCatSchema(fcols); + } + + public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException { + Pair<String, String> loc_server = new Pair<String, String>(location, hcatServerUri); + Table hcatTable = hcatTableCache.get(loc_server); + if (hcatTable != null) { + return hcatTable; + } + + Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location); + String dbName = dbTablePair.first; + String tableName = dbTablePair.second; + Table table = null; + HiveMetaStoreClient client = null; + try { + client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); + table = HCatUtil.getTable(client, dbName, tableName); + } catch (NoSuchObjectException nsoe) { + throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend + } catch (Exception e) { + throw new IOException(e); + } finally { + HCatUtil.closeHiveClientQuietly(client); + } + hcatTableCache.put(loc_server, table); + return table; + } + + public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException { + + List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>(); + for (HCatFieldSchema hfs : hcatSchema.getFields()) { + ResourceFieldSchema rfSchema; + rfSchema = getResourceSchemaFromFieldSchema(hfs); + rfSchemaList.add(rfSchema); + } + ResourceSchema rSchema = new ResourceSchema(); + rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0])); + return rSchema; + + } + + private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs) + throws IOException { + ResourceFieldSchema rfSchema; + // if we are dealing with a bag or tuple column - need to worry about subschema + if (hfs.getType() == Type.STRUCT) { + rfSchema = new ResourceFieldSchema() + .setName(hfs.getName()) + .setDescription(hfs.getComment()) + .setType(getPigType(hfs)) + .setSchema(getTupleSubSchema(hfs)); + } else if (hfs.getType() == Type.ARRAY) { + rfSchema = new ResourceFieldSchema() + .setName(hfs.getName()) + .setDescription(hfs.getComment()) + .setType(getPigType(hfs)) + .setSchema(getBagSubSchema(hfs)); + } else { + rfSchema = new ResourceFieldSchema() + .setName(hfs.getName()) + .setDescription(hfs.getComment()) + .setType(getPigType(hfs)) + .setSchema(null); // no munging inner-schemas + } + return rfSchema; + } + + protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException { + // there are two cases - array<Type> and array<struct<...>> + // in either case the element type of the array is represented in a + // tuple field schema in the bag's field schema - the second case (struct) + // more naturally translates to the tuple - in the first case (array<Type>) + // we simulate the tuple by putting the single field in a tuple + + Properties props = UDFContext.getUDFContext().getClientSystemProps(); + String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT; + if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) { + innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME) + .replaceAll("FIELDNAME", hfs.getName()); + } + String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT; + if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) { + innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME) + .replaceAll("FIELDNAME", hfs.getName()); + } + + ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1]; + bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName) + .setDescription("The tuple in the bag") + .setType(DataType.TUPLE); + HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0); + if (arrayElementFieldSchema.getType() == Type.STRUCT) { + bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema)); + } else if (arrayElementFieldSchema.getType() == Type.ARRAY) { + ResourceSchema s = new ResourceSchema(); + List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema)); + s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); + bagSubFieldSchemas[0].setSchema(s); + } else { + ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1]; + innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName) + .setDescription("The inner field in the tuple in the bag") + .setType(getPigType(arrayElementFieldSchema)) + .setSchema(null); // the element type is not a tuple - so no subschema + bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas)); + } + ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas); + return s; + + } + + private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException { + // for each struct subfield, create equivalent ResourceFieldSchema + ResourceSchema s = new ResourceSchema(); + List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>(); + for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) { + lrfs.add(getResourceSchemaFromFieldSchema(subField)); + } + s.setFields(lrfs.toArray(new ResourceFieldSchema[0])); + return s; + } + + /** + * @param hfs the field schema of the column + * @return corresponding pig type + * @throws IOException + */ + static public byte getPigType(HCatFieldSchema hfs) throws IOException { + return getPigType(hfs.getType()); + } + + static public byte getPigType(Type type) throws IOException { + String errMsg; + + if (type == Type.STRING) { + return DataType.CHARARRAY; + } + + if ((type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)) { + return DataType.INTEGER; + } + + if (type == Type.ARRAY) { + return DataType.BAG; + } + + if (type == Type.STRUCT) { + return DataType.TUPLE; + } + + if (type == Type.MAP) { + return DataType.MAP; + } + + if (type == Type.BIGINT) { + return DataType.LONG; + } + + if (type == Type.FLOAT) { + return DataType.FLOAT; + } + + if (type == Type.DOUBLE) { + return DataType.DOUBLE; + } + + if (type == Type.BINARY) { + return DataType.BYTEARRAY; + } + + if (type == Type.BOOLEAN) { + errMsg = "HCatalog column type 'BOOLEAN' is not supported in " + + "Pig as a column type"; + throw new PigException(errMsg, PIG_EXCEPTION_CODE); + } + + errMsg = "HCatalog column type '" + type.toString() + "' is not supported in Pig as a column type"; + throw new PigException(errMsg, PIG_EXCEPTION_CODE); + } + + public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception { + if (hr == null) { + return null; + } + return transformToTuple(hr.getAll(), hs); + } + + @SuppressWarnings("unchecked") + public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception { + Object result; + Type itemType = hfs.getType(); + switch (itemType) { + case BINARY: + result = (o == null) ? null : new DataByteArray((byte[]) o); + break; + case STRUCT: + result = transformToTuple((List<Object>) o, hfs); + break; + case ARRAY: + result = transformToBag((List<? extends Object>) o, hfs); + break; + case MAP: + result = transformToPigMap((Map<String, Object>) o, hfs); + break; + default: + result = o; + break; + } + return result; + } + + public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception { + try { + return transformToTuple(objList, hfs.getStructSubSchema()); + } catch (Exception e) { + if (hfs.getType() != Type.STRUCT) { + throw new Exception("Expected Struct type, got " + hfs.getType(), e); + } else { + throw e; + } + } + } + + public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception { + if (objList == null) { + return null; + } + Tuple t = tupFac.newTuple(objList.size()); + List<HCatFieldSchema> subFields = hs.getFields(); + for (int i = 0; i < subFields.size(); i++) { + t.set(i, extractPigObject(objList.get(i), subFields.get(i))); + } + return t; + } + + public static Map<String, Object> transformToPigMap(Map<String, Object> map, HCatFieldSchema hfs) throws Exception { + if (map == null) { + return null; + } + + Map<String, Object> result = new HashMap<String, Object>(); + for (Entry<String, Object> entry : map.entrySet()) { + result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0))); + } + return result; + } + + @SuppressWarnings("unchecked") + public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception { + if (list == null) { + return null; + } + + HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0); + DataBag db = new DefaultDataBag(); + for (Object o : list) { + Tuple tuple; + if (elementSubFieldSchema.getType() == Type.STRUCT) { + tuple = transformToTuple((List<Object>) o, elementSubFieldSchema); + } else { + // bags always contain tuples + tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema)); + } + db.add(tuple); + } + return db; + } + + + private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException { + for (HCatFieldSchema hcatField : tblSchema.getFields()) { + validateHcatFieldFollowsPigRules(hcatField); + } + } + + private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException { + try { + Type hType = hcatField.getType(); + switch (hType) { + case BOOLEAN: + throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE); + case ARRAY: + validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema()); + break; + case STRUCT: + validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema()); + break; + case MAP: + // key is only string + validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema()); + break; + } + } catch (HCatException e) { + throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e); + } + } + + + public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException { + validateHCatSchemaFollowsPigRules(hcatTableSchema); + } + + public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { + if (p.getProperty(propName) != null) { + config.set(propName, p.getProperty(propName)); + } + } + + public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) { + if (config.get(propName) != null) { + p.setProperty(propName, config.get(propName)); + } } - } }
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff ============================================================================== --- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (original) +++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java Mon Sep 10 23:28:55 2012 @@ -40,156 +40,156 @@ import org.apache.pig.data.Tuple; /** * based on {@link org.apache.pig.builtin.PigStorage} */ -public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable,Tuple> { +public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable, Tuple> { - private final LoadFunc loadFunc; - private static ResourceFieldSchema[] fields; + private final LoadFunc loadFunc; + private static ResourceFieldSchema[] fields; - public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException { + public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException { - this.loadFunc = loadFunc; - fields = dataSchema.getFields(); - - // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does) - if (loadFunc instanceof LoadMetadata) { - ((LoadMetadata)loadFunc).getSchema(location, new Job(conf)); - } - } + this.loadFunc = loadFunc; + fields = dataSchema.getFields(); - @Override - public RecordReader<BytesWritable, Tuple> createRecordReader( - InputSplit split, TaskAttemptContext taskContext) throws IOException, - InterruptedException { - RecordReader<BytesWritable,Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext); - return new LoadFuncBasedRecordReader(reader, loadFunc); - } - - @Override - public List<InputSplit> getSplits(JobContext jobContext) throws IOException, - InterruptedException { - try { - InputFormat<BytesWritable,Tuple> inpFormat = loadFunc.getInputFormat(); - return inpFormat.getSplits(jobContext); + // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does) + if (loadFunc instanceof LoadMetadata) { + ((LoadMetadata) loadFunc).getSchema(location, new Job(conf)); + } + } - } catch (InterruptedException e) { - throw new IOException(e); + @Override + public RecordReader<BytesWritable, Tuple> createRecordReader( + InputSplit split, TaskAttemptContext taskContext) throws IOException, + InterruptedException { + RecordReader<BytesWritable, Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext); + return new LoadFuncBasedRecordReader(reader, loadFunc); } - } - static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> { + @Override + public List<InputSplit> getSplits(JobContext jobContext) throws IOException, + InterruptedException { + try { + InputFormat<BytesWritable, Tuple> inpFormat = loadFunc.getInputFormat(); + return inpFormat.getSplits(jobContext); + + } catch (InterruptedException e) { + throw new IOException(e); + } + } - private Tuple tupleFromDisk; - private final RecordReader<BytesWritable,Tuple> reader; - private final LoadFunc loadFunc; - private final LoadCaster caster; + static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> { - /** - * @param reader - * @param loadFunc - * @throws IOException - */ - public LoadFuncBasedRecordReader(RecordReader<BytesWritable,Tuple> reader, LoadFunc loadFunc) throws IOException { - this.reader = reader; - this.loadFunc = loadFunc; - this.caster = loadFunc.getLoadCaster(); - } - - @Override - public void close() throws IOException { - reader.close(); - } - - @Override - public BytesWritable getCurrentKey() throws IOException, - InterruptedException { - return null; - } - - @Override - public Tuple getCurrentValue() throws IOException, InterruptedException { - - for(int i = 0; i < tupleFromDisk.size(); i++) { - - Object data = tupleFromDisk.get(i); - - // We will do conversion for bytes only for now - if (data instanceof DataByteArray) { - - DataByteArray dba = (DataByteArray) data; - - if(dba == null) { - // PigStorage will insert nulls for empty fields. - tupleFromDisk.set(i, null); - continue; + private Tuple tupleFromDisk; + private final RecordReader<BytesWritable, Tuple> reader; + private final LoadFunc loadFunc; + private final LoadCaster caster; + + /** + * @param reader + * @param loadFunc + * @throws IOException + */ + public LoadFuncBasedRecordReader(RecordReader<BytesWritable, Tuple> reader, LoadFunc loadFunc) throws IOException { + this.reader = reader; + this.loadFunc = loadFunc; + this.caster = loadFunc.getLoadCaster(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public BytesWritable getCurrentKey() throws IOException, + InterruptedException { + return null; + } + + @Override + public Tuple getCurrentValue() throws IOException, InterruptedException { + + for (int i = 0; i < tupleFromDisk.size(); i++) { + + Object data = tupleFromDisk.get(i); + + // We will do conversion for bytes only for now + if (data instanceof DataByteArray) { + + DataByteArray dba = (DataByteArray) data; + + if (dba == null) { + // PigStorage will insert nulls for empty fields. + tupleFromDisk.set(i, null); + continue; + } + + switch (fields[i].getType()) { + + case DataType.CHARARRAY: + tupleFromDisk.set(i, caster.bytesToCharArray(dba.get())); + break; + + case DataType.INTEGER: + tupleFromDisk.set(i, caster.bytesToInteger(dba.get())); + break; + + case DataType.FLOAT: + tupleFromDisk.set(i, caster.bytesToFloat(dba.get())); + break; + + case DataType.LONG: + tupleFromDisk.set(i, caster.bytesToLong(dba.get())); + break; + + case DataType.DOUBLE: + tupleFromDisk.set(i, caster.bytesToDouble(dba.get())); + break; + + case DataType.MAP: + tupleFromDisk.set(i, caster.bytesToMap(dba.get())); + break; + + case DataType.BAG: + tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i])); + break; + + case DataType.TUPLE: + tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i])); + break; + + default: + throw new IOException("Unknown Pig type in data: " + fields[i].getType()); + } + } } - - switch(fields[i].getType()) { - - case DataType.CHARARRAY: - tupleFromDisk.set(i, caster.bytesToCharArray(dba.get())); - break; - - case DataType.INTEGER: - tupleFromDisk.set(i, caster.bytesToInteger(dba.get())); - break; - - case DataType.FLOAT: - tupleFromDisk.set(i, caster.bytesToFloat(dba.get())); - break; - - case DataType.LONG: - tupleFromDisk.set(i, caster.bytesToLong(dba.get())); - break; - - case DataType.DOUBLE: - tupleFromDisk.set(i, caster.bytesToDouble(dba.get())); - break; - - case DataType.MAP: - tupleFromDisk.set(i, caster.bytesToMap(dba.get())); - break; - - case DataType.BAG: - tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i])); - break; - - case DataType.TUPLE: - tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i])); - break; - - default: - throw new IOException("Unknown Pig type in data: "+fields[i].getType()); - } - } - } - - return tupleFromDisk; - } - - - @Override - public void initialize(InputSplit split, TaskAttemptContext ctx) - throws IOException, InterruptedException { - - reader.initialize(split, ctx); - loadFunc.prepareToRead(reader, null); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - - // even if we don't need any data from disk, we will need to call - // getNext() on pigStorage() so we know how many rows to emit in our - // final output - getNext() will eventually return null when it has - // read all disk data and we will know to stop emitting final output - tupleFromDisk = loadFunc.getNext(); - return tupleFromDisk != null; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - } + return tupleFromDisk; + } + + + @Override + public void initialize(InputSplit split, TaskAttemptContext ctx) + throws IOException, InterruptedException { + + reader.initialize(split, ctx); + loadFunc.prepareToRead(reader, null); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + + // even if we don't need any data from disk, we will need to call + // getNext() on pigStorage() so we know how many rows to emit in our + // final output - getNext() will eventually return null when it has + // read all disk data and we will know to stop emitting final output + tupleFromDisk = loadFunc.getNext(); + return tupleFromDisk != null; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + } } Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff ============================================================================== --- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (original) +++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java Mon Sep 10 23:28:55 2012 @@ -38,82 +38,84 @@ import org.apache.pig.StoreMetadata; import org.apache.pig.data.Tuple; public class StoreFuncBasedOutputFormat extends - OutputFormat<BytesWritable, Tuple> { + OutputFormat<BytesWritable, Tuple> { private final StoreFuncInterface storeFunc; - + public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) { this.storeFunc = storeFunc; } - + @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, - InterruptedException { - OutputFormat<BytesWritable,Tuple> outputFormat = storeFunc.getOutputFormat(); + InterruptedException { + OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat(); outputFormat.checkOutputSpecs(jobContext); } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext ctx) - throws IOException, InterruptedException { + throws IOException, InterruptedException { String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo); + OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo); ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema()); String location = outputJobInfo.getLocation(); - OutputFormat<BytesWritable,Tuple> outputFormat = storeFunc.getOutputFormat(); + OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat(); return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs); } @Override public RecordWriter<BytesWritable, Tuple> getRecordWriter( - TaskAttemptContext ctx) throws IOException, InterruptedException { - RecordWriter<BytesWritable,Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx); + TaskAttemptContext ctx) throws IOException, InterruptedException { + RecordWriter<BytesWritable, Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx); String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo); + OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo); ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema()); String location = outputJobInfo.getLocation(); return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs); } - + static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> { - private final RecordWriter<BytesWritable,Tuple> writer; + private final RecordWriter<BytesWritable, Tuple> writer; private final StoreFuncInterface storeFunc; private final ResourceSchema schema; private final String location; - - public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable,Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException { + + public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable, Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException { this.writer = writer; this.storeFunc = sf; this.schema = rs; this.location = location; storeFunc.prepareToWrite(writer); } - + @Override public void close(TaskAttemptContext ctx) throws IOException, - InterruptedException { + InterruptedException { writer.close(ctx); } @Override public void write(BytesWritable key, Tuple value) throws IOException, - InterruptedException { + InterruptedException { storeFunc.putNext(value); } } - + static class StoreFuncBasedOutputCommitter extends OutputCommitter { StoreFuncInterface sf; OutputCommitter wrappedOutputCommitter; String location; ResourceSchema rs; + public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) { this.sf = sf; this.wrappedOutputCommitter = outputCommitter; this.location = location; this.rs = rs; } + @Override public void abortTask(TaskAttemptContext context) throws IOException { wrappedOutputCommitter.abortTask(context); @@ -126,7 +128,7 @@ public class StoreFuncBasedOutputFormat @Override public boolean needsTaskCommit(TaskAttemptContext context) - throws IOException { + throws IOException { return wrappedOutputCommitter.needsTaskCommit(context); } @@ -139,28 +141,28 @@ public class StoreFuncBasedOutputFormat public void setupTask(TaskAttemptContext context) throws IOException { wrappedOutputCommitter.setupTask(context); } - + public void commitJob(JobContext context) throws IOException { wrappedOutputCommitter.commitJob(context); if (sf instanceof StoreMetadata) { if (rs != null) { ((StoreMetadata) sf).storeSchema( - rs, location, new Job(context.getConfiguration()) ); + rs, location, new Job(context.getConfiguration())); } } } - + @Override public void cleanupJob(JobContext context) throws IOException { wrappedOutputCommitter.cleanupJob(context); if (sf instanceof StoreMetadata) { if (rs != null) { ((StoreMetadata) sf).storeSchema( - rs, location, new Job(context.getConfiguration()) ); + rs, location, new Job(context.getConfiguration())); } } } - + public void abortJob(JobContext context, JobStatus.State state) throws IOException { wrappedOutputCommitter.abortJob(context, state); } Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java?rev=1383152&r1=1383151&r2=1383152&view=diff ============================================================================== --- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java (original) +++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java Mon Sep 10 23:28:55 2012 @@ -40,138 +40,140 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.Tuple; public class MockLoader extends LoadFunc { - private static final class MockRecordReader extends RecordReader<Object, Object> { - @Override - public void close() throws IOException { - } - - @Override - public Object getCurrentKey() throws IOException, InterruptedException { - return "mockKey"; - } - - @Override - public Object getCurrentValue() throws IOException, InterruptedException { - return "mockValue"; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0.5f; + private static final class MockRecordReader extends RecordReader<Object, Object> { + @Override + public void close() throws IOException { + } + + @Override + public Object getCurrentKey() throws IOException, InterruptedException { + return "mockKey"; + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + return "mockValue"; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0.5f; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, + InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return true; + } + } + + private static final class MockInputSplit extends InputSplit implements Writable { + private String location; + + public MockInputSplit() { + } + + public MockInputSplit(String location) { + this.location = location; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{location}; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 10000000; + } + + @Override + public boolean equals(Object arg0) { + return arg0 == this; + } + + @Override + public int hashCode() { + return location.hashCode(); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + location = arg0.readUTF(); + } + + @Override + public void write(DataOutput arg0) throws IOException { + arg0.writeUTF(location); + } + } + + private static final class MockInputFormat extends InputFormat { + + private final String location; + + public MockInputFormat(String location) { + this.location = location; + } + + @Override + public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new MockRecordReader(); + } + + @Override + public List getSplits(JobContext arg0) throws IOException, InterruptedException { + return Arrays.asList(new MockInputSplit(location)); + } } - @Override - public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, - InterruptedException { - } + private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>(); - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return true; + public static void setData(String location, Iterable<Tuple> data) { + locationToData.put(location, data); } - } - private static final class MockInputSplit extends InputSplit implements Writable { private String location; - public MockInputSplit() { - } - public MockInputSplit(String location) { - this.location = location; - } - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[] { location }; - } + private Iterator<Tuple> data; @Override - public long getLength() throws IOException, InterruptedException { - return 10000000; + public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + return location; } @Override - public boolean equals(Object arg0) { - return arg0==this; + public void setLocation(String location, Job job) throws IOException { + this.location = location; + if (location == null) { + throw new IOException("null location passed to MockLoader"); + } + this.data = locationToData.get(location).iterator(); + if (this.data == null) { + throw new IOException("No data configured for location: " + location); + } } @Override - public int hashCode() { - return location.hashCode(); + public Tuple getNext() throws IOException { + if (data == null) { + throw new IOException("data was not correctly initialized in MockLoader"); + } + return data.hasNext() ? data.next() : null; } @Override - public void readFields(DataInput arg0) throws IOException { - location = arg0.readUTF(); + public InputFormat getInputFormat() throws IOException { + return new MockInputFormat(location); } @Override - public void write(DataOutput arg0) throws IOException { - arg0.writeUTF(location); + public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException { } - } - - private static final class MockInputFormat extends InputFormat { - - private final String location; - - public MockInputFormat(String location) { - this.location = location; - } - - @Override - public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { - return new MockRecordReader(); - } - - @Override - public List getSplits(JobContext arg0) throws IOException, InterruptedException { - return Arrays.asList(new MockInputSplit(location)); - } - } - - private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>(); - - public static void setData(String location, Iterable<Tuple> data) { - locationToData.put(location, data); - } - - private String location; - - private Iterator<Tuple> data; - - @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException { - return location; - } - - @Override - public void setLocation(String location, Job job) throws IOException { - this.location = location; - if (location == null) { - throw new IOException("null location passed to MockLoader"); - } - this.data = locationToData.get(location).iterator(); - if (this.data == null) { - throw new IOException("No data configured for location: "+location); - } - } - - @Override - public Tuple getNext() throws IOException { - if (data == null) { - throw new IOException("data was not correctly initialized in MockLoader"); - } - return data.hasNext() ? data.next() : null; - } - - @Override - public InputFormat getInputFormat() throws IOException { - return new MockInputFormat(location); - } - - @Override - public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException { - } } Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java?rev=1383152&r1=1383151&r2=1383152&view=diff ============================================================================== --- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java (original) +++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java Mon Sep 10 23:28:55 2012 @@ -24,15 +24,16 @@ import org.apache.pig.data.Tuple; public class MyPigStorage extends PigStorage { - String arg2; - public MyPigStorage(String arg1, String arg2) throws IOException { - super(arg1); - this.arg2 = arg2; - } - - @Override - public void putNext(Tuple t) throws IOException { - t.append(arg2); - super.putNext(t); - } + String arg2; + + public MyPigStorage(String arg1, String arg2) throws IOException { + super(arg1); + this.arg2 = arg2; + } + + @Override + public void putNext(Tuple t) throws IOException { + t.append(arg2); + super.putNext(t); + } }
