sorry for sending this to wrong (dev) list. I've send a new one the user list.
On Tue, Jun 10, 2014 at 10:45 AM, Christian Tzolov < christian.tzo...@gmail.com> wrote: > Hi all, > > I am trying to create a Crunch data source for a custom InputFormat that > has this structure CustomInputFormat<Void, CustomNonWritableClass> > > Sorry for the long mail. I've tried two implementations with no success. I > must be missing something but not sure where? > > > > 1. Implementation: Derive PType<Pair<Void, CustomNonWritableClass>> using > MapWritable as base type > > ---------------------------------------------------------------------------------------------------------------------------------------- > PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived( > > (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null, > null).getClass(), > > new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() { > public Pair<Void, CustomNonWritableClass> map(MapWritable input) > {...} > }, > > new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() { > public MapWritable map(Pair<Void, CustomNonWritableClass> input) > {...} > }, > > typeFamily.records(MapWritable.class) > ); > > > public class CustomDataSource extends FileTableSourceImpl<Void, > CustomNonWritableClass > { > > public CustomDataSource() { > super(new Path("xsource"), > > (PTableType<Void, CustomNonWritableClass >) derivedType), > > FormatBundle.forInput(CustomInputFormat.class)); > } > ... > } > > > When I try this implementation it fails before submitting the job with the > following error: > > Exception in thread "main" java.lang.ClassCastException: > org.apache.crunch.types.writable.WritableType cannot be cast to > org.apache.crunch.types.PTableType > at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...) > > > 2. Implementation: Derive PType<CustomNonWritableClass> using MapWritable > as base type > > ---------------------------------------------------------------------------------------------------------------------------------------- > > public static MapWritableToCustomNonWritableClass extends > MapFn<MapWritable, CustomNonWritableClass> { > public CustomNonWritableClass map(MapWritable input) {...} > } > > public static CustomNonWritableClassToMapWritable > extends MapFn<CustomNonWritableClass, MapWritable>() { > public MapWritable map(CustomNonWritableClass input) {...} > } > > PType<CustomNonWritableClass> derivedType = typeFamily.derived( > > CustomNonWritableClass.class, > > new MapWritableToCustomNonWritableClass(), > > new CustomNonWritableClassToMapWritable(), > > typeFamily.records(MapWritable.class) > ); > > public class CustomDataSource extends > FileSourceImpl<CustomNonWritableClass> { > > public CustomDataSource() { > super(new Path("xsource"), > > (PTableType<Void, CustomNonWritableClass >) derivedType), > > FormatBundle.forInput(CustomInputFormat.class)); > } > ... > } > > When run this gets submitted to the cluster, starts the MR job but > eventually fails with: > > 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290] > org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: > attempt_1401786307497_0078_m_000000_0 - exited : > java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be > cast to org.apache.hadoop.io.MapWritable > at > com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1) > at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63) > at org.apache.crunch.MapFn.process(MapFn.java:34) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) > at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) > > > > Thanks, > Christian >