Good question. At this point, I'd have to re-run it to know for sure. We've
been trying various different things, so I'd have to reset the flow config
back to that state.

I can say that by removing persist(DISK_ONLY), the flows are running more
stably, probably due to removing disk contention. We won't be able to run
our full production flows without some type of disk persistence but for
testing, this is how we are continuing to try for now.

I can try tomorrow if you'd like.

-Suren



On Wed, Jun 18, 2014 at 8:35 PM, Patrick Wendell <pwend...@gmail.com> wrote:

> Just wondering, do you get this particular exception if you are not
> consolidating shuffle data?
>
> On Wed, Jun 18, 2014 at 12:15 PM, Mridul Muralidharan <mri...@gmail.com>
> wrote:
> > On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
> > <suren.hira...@velos.io> wrote:
> >> Patrick,
> >>
> >> My team is using shuffle consolidation but not speculation. We are also
> >> using persist(DISK_ONLY) for caching.
> >
> >
> > Use of shuffle consolidation is probably what is causing the issue.
> > Would be good idea to try again with that turned off (which is the
> default).
> >
> > It should get fixed most likely in 1.1 timeframe.
> >
> >
> > Regards,
> > Mridul
> >
> >
> >>
> >> Here are some config changes that are in our work-in-progress.
> >>
> >> We've been trying for 2 weeks to get our production flow (maybe around
> >> 50-70 stages, a few forks and joins with up to 20 branches in the
> forks) to
> >> run end to end without any success, running into other problems besides
> >> this one as well. For example, we have run into situations where saving
> to
> >> HDFS just hangs on a couple of tasks, which are printing out nothing in
> >> their logs and not taking any CPU. For testing, our input data is 10 GB
> >> across 320 input splits and generates maybe around 200-300 GB of
> >> intermediate and final data.
> >>
> >>
> >>         conf.set("spark.executor.memory", "14g")     // TODO make this
> >> configurable
> >>
> >>         // shuffle configs
> >>         conf.set("spark.default.parallelism", "320") // TODO make this
> >> configurable
> >>         conf.set("spark.shuffle.consolidateFiles","true")
> >>
> >>         conf.set("spark.shuffle.file.buffer.kb", "200")
> >>         conf.set("spark.reducer.maxMbInFlight", "96")
> >>
> >>         conf.set("spark.rdd.compress","true"
> >>
> >>         // we ran into a problem with the default timeout of 60 seconds
> >>         // this is also being set in the master's spark-env.sh. Not
> sure if
> >> it needs to be in both places
> >>         conf.set("spark.worker.timeout","180")
> >>
> >>         // akka settings
> >>         conf.set("spark.akka.threads", "300")
> >>         conf.set("spark.akka.timeout", "180")
> >>         conf.set("spark.akka.frameSize", "100")
> >>         conf.set("spark.akka.batchSize", "30")
> >>         conf.set("spark.akka.askTimeout", "30")
> >>
> >>         // block manager
> >>         conf.set("spark.storage.blockManagerTimeoutIntervalMs",
> "180000")
> >>         conf.set("spark.blockManagerHeartBeatMs", "80000")
> >>
> >> -Suren
> >>
> >>
> >>
> >> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pwend...@gmail.com>
> wrote:
> >>
> >>> Out of curiosity - are you guys using speculation, shuffle
> >>> consolidation, or any other non-default option? If so that would help
> >>> narrow down what's causing this corruption.
> >>>
> >>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
> >>> <suren.hira...@velos.io> wrote:
> >>> > Matt/Ryan,
> >>> >
> >>> > Did you make any headway on this? My team is running into this also.
> >>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but
> we
> >>> > generate 100s of GBs in the flow itself.
> >>> >
> >>> > -Suren
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.r...@gmail.com
> >
> >>> wrote:
> >>> >
> >>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >>> >> cluster (no modifications to Spark or its dependencies). The error
> >>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >>> >> edge list (GraphX worked beautifully on smaller data).
> >>> >>
> >>> >> Here's the stacktrace (it's quite similar to yours
> >>> >> https://imgur.com/7iBA4nJ ).
> >>> >>
> >>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39
> failed
> >>> >> 4 times; aborting job
> >>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce
> at
> >>> >> VertexRDD.scala:100
> >>> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >>> >> recent failure: Exception failure in TID 29735 on host node18:
> >>> >> java.io.StreamCorruptedException: invalid type code: AC
> >>> >>
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >>> >>
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>> >>
> >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >>
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >>
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>> >>
> >>> >>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> >>> >>
> >>> >>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>> >>
> >>> >>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>> >>         java.lang.Thread.run(Thread.java:662)
> >>> >> Driver stacktrace:
> >>> >> at org.apache.spark.scheduler.DAGScheduler.org
> >>> >>
> >>>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >>> >> at
> >>> >>
> >>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >>> >> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >>> >> at scala.Option.foreach(Option.scala:236)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> >>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >>> >> at
> >>> >>
> >>>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >>> >> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>> >> at
> >>> >>
> >>>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>> >> at
> >>>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>> >> at
> >>> >>
> >>>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling
> stage 5
> >>> >>
> >>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com>
> wrote:
> >>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mki...@oculusinfo.com
> >
> >>> >> wrote:
> >>> >> >> Im trying run some spark code on a cluster but I keep running
> into a
> >>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error.
> My
> >>> task
> >>> >> >> involves analyzing ~50GB of data (some operations involve
> sorting)
> >>> then
> >>> >> >> writing them out to a JSON file. Im running the analysis on each
> of
> >>> the
> >>> >> >> data's ~10 columns and have never had a successful run. My
> program
> >>> >> seems to
> >>> >> >> run for a varying amount of time each time (~between 5-30
> minutes)
> >>> but
> >>> >> it
> >>> >> >> always terminates with this error.
> >>> >> >
> >>> >> > I can tell you that this usually means somewhere something wrote
> >>> >> > objects to the same OutputStream with multiple
> ObjectOutputStreams. AC
> >>> >> > is a header value.
> >>> >> >
> >>> >> > I don't obviously see where/how that could happen, but maybe it
> rings
> >>> >> > a bell for someone. This could happen if an OutputStream is reused
> >>> >> > across object serializations but new ObjectOutputStreams are
> opened,
> >>> >> > for example.
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> >
> >>> > SUREN HIRAMAN, VP TECHNOLOGY
> >>> > Velos
> >>> > Accelerating Machine Learning
> >>> >
> >>> > 440 NINTH AVENUE, 11TH FLOOR
> >>> > NEW YORK, NY 10001
> >>> > O: (917) 525-2466 ext. 105
> >>> > F: 646.349.4063
> >>> > E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
> >>> > W: www.velos.io
> >>>
> >>
> >>
> >>
> >> --
> >>
> >> SUREN HIRAMAN, VP TECHNOLOGY
> >> Velos
> >> Accelerating Machine Learning
> >>
> >> 440 NINTH AVENUE, 11TH FLOOR
> >> NEW YORK, NY 10001
> >> O: (917) 525-2466 ext. 105
> >> F: 646.349.4063
> >> E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
> >> W: www.velos.io
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io
W: www.velos.io

Reply via email to