[ 
https://issues.apache.org/jira/browse/SPARK-12518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanpeng Wu updated SPARK-12518:
--------------------------------
    Description: 
When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark 
application, I found some problem in record deserialization. The object of 
*SAMRecord* could not be deserialzed and throw the exception: 
{quote}
WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException
        at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:340)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{quote}
It seems that the application encountered a premature EOF when deserialing.

Here is my test code: 
{code:title=Test.java|borderStyle=solid}
public class Test {

        public static void main(String[] args) {

                SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk 
- Test ver");
                JavaSparkContext jsc = new JavaSparkContext(sparkConf);

                jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, 
LongWritable.class, SAMRecordWritable.class,
                                jsc.hadoopConfiguration())
                                .map(new Function<Tuple2<LongWritable, 
SAMRecordWritable>, SAMRecordWritable>() {
                                        private static final long 
serialVersionUID = 1791992620460009575L;

                                        @Override
                                        public SAMRecordWritable 
call(Tuple2<LongWritable, SAMRecordWritable> tuple2)
                                        throws Exception {
                                                return tuple2._2;
                                        }
                                }).repartition(18).saveAsTextFile(args[1]);

                jsc.close();
                jsc.stop();
        }

}
{code}
My custom JavaSerializer class: 
{code:title=SAMRecordWritable .java|borderStyle=solid}
public class SAMRecordWritable extends JavaSerializer {
        private static final long serialVersionUID = 8212888201641460871L;

        private static final BAMRecordCodec lazyCodec =
                new BAMRecordCodec(null, new LazyBAMRecordFactory());

        private transient SAMRecord record;

        public SAMRecord get()            { return record; }
        public void      set(SAMRecord r) { record = r; }
        
        /* JavaSerializer */
        public void writeExternal(java.io.ObjectOutput out) {
                final BAMRecordCodec codec = new 
BAMRecordCodec(record.getHeader());
                codec.setOutputStream(new DataOutputWrapper(out));
                codec.encode(record);
        }
        
        public void readExternal(java.io.ObjectInput in) {
                lazyCodec.setInputStream(new DataInputWrapper(in));
                record = lazyCodec.decode();
        }
}
{code}
But when I serialize the record to a local file, not in Spark, it works. This 
confuses me a lot. Anybody help?

  was:
When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark 
application, I found some problem in record deserialization. The object of 
*SAMRecord* could not be deserialzed and throw the exception: 
{quote}
WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException
        at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:340)
        at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{quote}
It seems that the application encountered a premature EOF when deserialing.

Here is my test code: 
{code:title=Test.java|borderStyle=solid}
public class Test {

        public static void main(String[] args) {

                SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk 
- Test ver");
                JavaSparkContext jsc = new JavaSparkContext(sparkConf);

                jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, 
LongWritable.class, SAMRecordWritable.class,
                                jsc.hadoopConfiguration())
                                .map(new Function<Tuple2<LongWritable, 
SAMRecordWritable>, SAMRecordWritable>() {
                                        private static final long 
serialVersionUID = 1791992620460009575L;

                                        @Override
                                        public SAMRecordWritable 
call(Tuple2<LongWritable, SAMRecordWritable> tuple2) throws Exception {
                                                return tuple2._2;
                                        }
                                }).repartition(18).saveAsTextFile(args[1]);

                jsc.close();
                jsc.stop();
        }

}
{code}
My custom JavaSerializer class: 
{code:title=SAMRecordWritable .java|borderStyle=solid}
public class SAMRecordWritable extends JavaSerializer {
        private static final long serialVersionUID = 8212888201641460871L;

        private static final BAMRecordCodec lazyCodec =
                new BAMRecordCodec(null, new LazyBAMRecordFactory());

        private transient SAMRecord record;

        public SAMRecord get()            { return record; }
        public void      set(SAMRecord r) { record = r; }
        
        /* JavaSerializer */
        public void writeExternal(java.io.ObjectOutput out) {
                final BAMRecordCodec codec = new 
BAMRecordCodec(record.getHeader());
                codec.setOutputStream(new DataOutputWrapper(out));
                codec.encode(record);
        }
        
        public void readExternal(java.io.ObjectInput in) {
                lazyCodec.setInputStream(new DataInputWrapper(in));
                record = lazyCodec.decode();
        }
}
{code}
But when I serialize the record to a local file, not in Spark, it works. This 
confuses me a lot. Anybody help?


> Problem in Spark deserialize
> ----------------------------
>
>                 Key: SPARK-12518
>                 URL: https://issues.apache.org/jira/browse/SPARK-12518
>             Project: Spark
>          Issue Type: Question
>          Components: Java API
>    Affects Versions: 1.5.2
>            Reporter: Zhanpeng Wu
>
> When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark 
> application, I found some problem in record deserialization. The object of 
> *SAMRecord* could not be deserialzed and throw the exception: 
> {quote}
> WARN ThrowableSerializationWrapper: Task exception could not be deserialized
> java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:340)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>         at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>         at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at 
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> It seems that the application encountered a premature EOF when deserialing.
> Here is my test code: 
> {code:title=Test.java|borderStyle=solid}
> public class Test {
>       public static void main(String[] args) {
>               SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk 
> - Test ver");
>               JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>               jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, 
> LongWritable.class, SAMRecordWritable.class,
>                               jsc.hadoopConfiguration())
>                               .map(new Function<Tuple2<LongWritable, 
> SAMRecordWritable>, SAMRecordWritable>() {
>                                       private static final long 
> serialVersionUID = 1791992620460009575L;
>                                       @Override
>                                       public SAMRecordWritable 
> call(Tuple2<LongWritable, SAMRecordWritable> tuple2)
>                                         throws Exception {
>                                               return tuple2._2;
>                                       }
>                               }).repartition(18).saveAsTextFile(args[1]);
>               jsc.close();
>               jsc.stop();
>       }
> }
> {code}
> My custom JavaSerializer class: 
> {code:title=SAMRecordWritable .java|borderStyle=solid}
> public class SAMRecordWritable extends JavaSerializer {
>       private static final long serialVersionUID = 8212888201641460871L;
>       private static final BAMRecordCodec lazyCodec =
>               new BAMRecordCodec(null, new LazyBAMRecordFactory());
>       private transient SAMRecord record;
>       public SAMRecord get()            { return record; }
>       public void      set(SAMRecord r) { record = r; }
>       
>       /* JavaSerializer */
>       public void writeExternal(java.io.ObjectOutput out) {
>               final BAMRecordCodec codec = new 
> BAMRecordCodec(record.getHeader());
>               codec.setOutputStream(new DataOutputWrapper(out));
>               codec.encode(record);
>       }
>       
>       public void readExternal(java.io.ObjectInput in) {
>               lazyCodec.setInputStream(new DataInputWrapper(in));
>               record = lazyCodec.decode();
>       }
> }
> {code}
> But when I serialize the record to a local file, not in Spark, it works. This 
> confuses me a lot. Anybody help?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to