Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129
On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <[email protected]> wrote: > No, you're not-- I think that's a bug. Switching to "parallelDo" instead > of "by" on the groupedData object will work fine, but we should make sure > the by() operation works on grouped tables. > > > On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <[email protected]>wrote: > >> Alright, I'm back for more. This time, I'm trying to perform a group by >> with Avro data. What I've currently got is this: >> >> PGroupedTable<String, MyAvroObject> processedData = >> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() { >> public void process(String line, Emitter<Pair<String, MyAvroObject>> >> emitter) { >> String key = getKey(line); >> MyAvroObject value = convertToAvroObject(line); >> emitter.emit(Pair.of(key, value)); >> } >> }, Avros.tableOf(Avros.strings(), >> Avros.specifics(MyAvroObject.class))) >> .groupByKey(3); >> >> PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData >> = >> processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>, >> MyAvroGroup>() { >> @Override >> public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>> >> input) { >> MyAvroGroup group = new MyAvroGroup(); >> group.objects = Lists.<MyAvroObject>newArrayList(); >> >> for (MyAvroObject obj : input.second()) { >> group.objects.add(obj); >> } >> >> return group; >> } >> }, >> Avros.specifics(MyAvroGroup.class)); >> >> I think this is all pretty sane, but I'm getting an exception when the >> pipeline attempts to run the by(): >> >> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load >> native-hadoop library for your platform... using builtin-java classes where >> applicable >> Exception in thread "main" java.lang.ClassCastException: >> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to >> org.apache.crunch.types.avro.AvroType >> at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608) >> at >> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135) >> at org.apache.crunch.impl.mem.collect.MemCollection.by >> (MemCollection.java:222) >> >> Am I doing something obviously wrong? >> >> Thanks, >> Natty >> >> >> >> >> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <[email protected]>wrote: >> >>> To bring things full circle, the core issue I was having was caused by >>> the fact that I was writing the data in the wrong way. Instead of >>> >>> pipeline.writeTextFile(words, args[1]); >>> >>> I should have been using >>> >>> pipeline.write(words, To.avroFile(args[1]); >>> >>> As Josh noted, writeTextFile was attempting to write my data out as a >>> String, but I wasn't giving it an object that was easy to turn into a >>> String, which resulted in an exception. Changing it to write to an avro >>> file solved those issues. >>> >>> Thanks, Josh! >>> >>> >>> >>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <[email protected]> wrote: >>> >>>> Hey Natty, >>>> >>>> Reply inlined. >>>> >>>> >>>> On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins >>>> <[email protected]>wrote: >>>> >>>>> Hey Josh, >>>>> >>>>> That really doesn't solve the problem I'm facing. Avros.specifics >>>>> assumes that I've got a Java file that Avro generated for me, which I >>>>> don't >>>>> have. I can certainly go through the trouble of getting that file, but >>>>> what >>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema. >>>>> It's a perfectly valid use case, and as far as I can tell, from what's >>>>> provided by the Avros utility class, it should be supported. So here's my >>>>> question: >>>>> >>>> >>>> Interesting-- I had not hit that use case for Avro before. For a POJO, >>>> I would just use the reflection APIs, which are available via >>>> Avros.reflects. >>>> >>>> >>>>> >>>>> Is the Avros.generics issue a bug? It seems to me that the T of >>>>> PType<T> has to implement Writable, and in the case of the return type of >>>>> Avros.generics, this is not the case. >>>>> >>>> >>>> There's no requirement for the PType<T> to be a Writable, or even an >>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you >>>> create PType<T> that depend on other PTypes, which is how Crunch handles >>>> things like protocol buffers/thrift/jackson-style object serializations. >>>> >>>> I'm just taking a closer look at the Exception that was thrown, and it >>>> looks to me like the problem is occurring at the end of the pipeline, where >>>> you're calling pipeline.writeTextFile (not included in the code snippet >>>> posted). Crunch has to convert the PType to something that can be converted >>>> to a Writable impl-- if you try to write an Avro object to the >>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to >>>> me that in this case, Crunch can't figure out how to turn MyAvroObject into >>>> a Writable instance for writing to the TextOuputFormat. >>>> >>>> >>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever >>>>> necessary hoops exist. >>>>> >>>> >>>> One way to fix this would be to update writeTextFile to force >>>> conversion of any non-string that was passed into it into a String via an >>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place. >>>> What do you think? >>>> >>>> >>>>> Thanks, >>>>> Natty >>>>> >>>>> >>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <[email protected]>wrote: >>>>> >>>>>> Did you look at Avros.specifics? >>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), >>>>>>> and then I modify my code to use GenericData.Records. Those Records >>>>>>> still >>>>>>> don't implement the Writable interface, so I'm still getting a class >>>>>>> cast >>>>>>> exception. Did I do something totally wrong? >>>>>>> >>>>>>> >>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <[email protected] >>>>>>> > wrote: >>>>>>> >>>>>>>> Well, the problem with that is that I really want to work with my >>>>>>>> objects, rather than use Avros.generics, because then I'm forced to >>>>>>>> treat >>>>>>>> everything as a GenericData.Record. It's just a pain in the butt. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <[email protected]>wrote: >>>>>>>> >>>>>>>>> You don't want to create an AvroType yourself, you want to call >>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a >>>>>>>>> Class >>>>>>>>> object. >>>>>>>>> >>>>>>>>> Interesting though, I would still want that case to work correctly. >>>>>>>>> >>>>>>>>> Josh >>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand >>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm >>>>>>>>>> doing >>>>>>>>>> something along these lines: >>>>>>>>>> >>>>>>>>>> Schema.Parser schemaParser = new Schema.Parser(); >>>>>>>>>> final Schema avroObjSchema = schemaParser.parse( >>>>>>>>>> schemaJsonString); >>>>>>>>>> >>>>>>>>>> AvroType avroType = new >>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class, >>>>>>>>>> avroObjSchema, new >>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>( >>>>>>>>>> MyAvroObject.class, avroObjSchema)); >>>>>>>>>> >>>>>>>>>> PCollection<MyAvroObject> words = logs.parallelDo(new >>>>>>>>>> DoFn<String, MyAvroObject>() { >>>>>>>>>> public void process(String line, Emitter<MyAvroObject> >>>>>>>>>> emitter) { >>>>>>>>>> emitter.emit(convertStringToAvroObj(line)); >>>>>>>>>> } >>>>>>>>>> }, avroType); >>>>>>>>>> >>>>>>>>>> However, this results in a class cast exception: >>>>>>>>>> >>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class >>>>>>>>>> com.company.MyAvroObject >>>>>>>>>> at java.lang.Class.asSubclass(Class.java:3039) >>>>>>>>>> at >>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250) >>>>>>>>>> at >>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86) >>>>>>>>>> at >>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61) >>>>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily.as >>>>>>>>>> (WritableTypeFamily.java:135) >>>>>>>>>> at >>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319) >>>>>>>>>> >>>>>>>>>> Anybody have any thoughts? There's got to be a magical >>>>>>>>>> incantation that I have slightly off. >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>>> >>>> -- >>>> Director of Data Science >>>> Cloudera <http://www.cloudera.com> >>>> Twitter: @josh_wills <http://twitter.com/josh_wills> >>>> >>>> >>> >> > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > >
