[
https://issues.apache.org/jira/browse/CRUNCH-339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13891645#comment-13891645
]
Micah Whitacre commented on CRUNCH-339:
---------------------------------------
If your DoFn is like your example (no holding onto any references and an
anonymous inner class) the issue might be that the containing class is not
serializable. That is called out in the guide Josh references as well.
> java.io.NotSerializableException when calling PCollection.cache()
> -----------------------------------------------------------------
>
> Key: CRUNCH-339
> URL: https://issues.apache.org/jira/browse/CRUNCH-339
> Project: Crunch
> Issue Type: Bug
> Components: IO
> Affects Versions: 0.8.2
> Environment: CDH4.4.0
> Reporter: Sungwoo Park
>
> When I was debugging a MRPipeline, I found that calling cache() function
> causes java.io.NotSerializableException
> Code:
> (...)
> PCollection<Long> candidates = loadCandidate(pipeline, candidatesPath);
> candidates.cache();
> (...)
> public PCollection<Long> loadCandidate(Pipeline p, Path candidatesPath) {
> if(candidatesPath == null)
> return null;
>
> return p.read(From.textFile(candidatesPath)).parallelDo(new
> DoFn<String, Long>(){
> @Override
> public void process(String stringId, Emitter<Long>
> emitter) {
> emitter.emit(Long.parseLong(stringId));
> }
>
> }, Writables.longs());
> }
> Stack trace:
> Exception in thread "main" org.apache.crunch.CrunchRuntimeException:
> java.io.NotSerializableException:
> com.coupang.recommender.ever.utils.recommender.OryxRecommender
> at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:104)
> at org.apache.crunch.impl.mr.MRPipeline.runAsync(MRPipeline.java:123)
> at org.apache.crunch.impl.mr.MRPipeline.run(MRPipeline.java:111)
> at
> org.apache.crunch.impl.dist.DistributedPipeline.done(DistributedPipeline.java:109)
> at
> com.coupang.recommender.ever.utils.recommender.OryxRecommender.run(OryxRecommender.java:108)
> at com.coupang.recommender.ever.utils.EverUtil.run(EverUtil.java:30)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> at com.coupang.recommender.ever.utils.Driver.main(Driver.java:34)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
> Caused by: java.io.NotSerializableException:
> com.coupang.recommender.ever.utils.recommender.OryxRecommender
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at java.util.ArrayList.writeObject(ArrayList.java:570)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at java.util.ArrayList.writeObject(ArrayList.java:570)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at org.apache.crunch.util.DistCache.write(DistCache.java:55)
> at
> org.apache.crunch.impl.mr.plan.JobPrototype.serialize(JobPrototype.java:242)
> at
> org.apache.crunch.impl.mr.plan.JobPrototype.build(JobPrototype.java:215)
> at
> org.apache.crunch.impl.mr.plan.JobPrototype.getCrunchJob(JobPrototype.java:134)
> at org.apache.crunch.impl.mr.plan.MSCRPlanner.plan(MSCRPlanner.java:165)
> at org.apache.crunch.impl.mr.MRPipeline.plan(MRPipeline.java:102)
> ... 12 more
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)