Hi all, I am trying to create a Crunch data source for a custom InputFormat that has this structure CustomInputFormat<Void, CustomNonWritableClass>
Sorry for the long mail. I've tried two implementations with no success. I must be missing something but not sure where? 1. Implementation: Derive PType<Pair<Void, CustomNonWritableClass>> using MapWritable as base type ---------------------------------------------------------------------------------------------------------------------------------------- PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived( (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null, null).getClass(), new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() { public Pair<Void, CustomNonWritableClass> map(MapWritable input) {...} }, new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() { public MapWritable map(Pair<Void, CustomNonWritableClass> input) {...} }, typeFamily.records(MapWritable.class) ); public class CustomDataSource extends FileTableSourceImpl<Void, CustomNonWritableClass > { public CustomDataSource() { super(new Path("xsource"), (PTableType<Void, CustomNonWritableClass >) derivedType), FormatBundle.forInput(CustomInputFormat.class)); } ... } When I try this implementation it fails before submitting the job with the following error: Exception in thread "main" java.lang.ClassCastException: org.apache.crunch.types.writable.WritableType cannot be cast to org.apache.crunch.types.PTableType at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...) 2. Implementation: Derive PType<CustomNonWritableClass> using MapWritable as base type ---------------------------------------------------------------------------------------------------------------------------------------- public static MapWritableToCustomNonWritableClass extends MapFn<MapWritable, CustomNonWritableClass> { public CustomNonWritableClass map(MapWritable input) {...} } public static CustomNonWritableClassToMapWritable extends MapFn<CustomNonWritableClass, MapWritable>() { public MapWritable map(CustomNonWritableClass input) {...} } PType<CustomNonWritableClass> derivedType = typeFamily.derived( CustomNonWritableClass.class, new MapWritableToCustomNonWritableClass(), new CustomNonWritableClassToMapWritable(), typeFamily.records(MapWritable.class) ); public class CustomDataSource extends FileSourceImpl<CustomNonWritableClass> { public CustomDataSource() { super(new Path("xsource"), (PTableType<Void, CustomNonWritableClass >) derivedType), FormatBundle.forInput(CustomInputFormat.class)); } ... } When run this gets submitted to the cluster, starts the MR job but eventually fails with: 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task: attempt_1401786307497_0078_m_000000_0 - exited : java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be cast to org.apache.hadoop.io.MapWritable at com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1) at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Thanks, Christian