Just repartition to 1 partition before writing.

On Fri, Jun 5, 2015 at 3:46 PM, marcos rebelo <ole...@gmail.com> wrote:

> Hi all
>
> I'm running spark in a single local machine, no hadoop, just reading and
> writing in local disk.
>
> I need to have a single file as output of my calculation.
>
> if I do "rdd.saveAsTextFile(...)" all runs ok but I get allot of files.
> Since I need a single file I was considering to do something like:
>
>       Try {new FileWriter(outputPath)} match {
>         case Success(writer) =>
>           try {
>             rdd.toLocalIterator.foreach({line =>
>               val str = line.toString
>               writer.write(str)
>             }
>           }
>         }
>         ...
>       }
>
>
> I get:
>
> [error] o.a.s.e.Executor - Exception in task 0.0 in stage 41.0 (TID 32)
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in
> thread Thread[Executor task launch worker-1,5,main]
> java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
> ~[na:1.8.0_45]
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> ~[na:1.8.0_45]
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> ~[na:1.8.0_45]
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> ~[na:1.8.0_45]
> [error] o.a.s.s.TaskSetManager - Task 0 in stage 41.0 failed 1 times;
> aborting job
> [warn] application - Can't write to /tmp/err1433498283479.csv: {}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 41.0 (TID 32, localhost): java.lang.OutOfMemoryError: Java heap space
>     at java.util.Arrays.copyOf(Arrays.java:3236)
>     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
>     at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>     at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.10.5.jar:na]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> ~[scala-library-2.10.5.jar:na]
>
>
> if this rdd.toLocalIterator.foreach(...) doesn't work, what is the better
> solution?
>
> Best Regards
> Marcos
>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to