Yes, that is correct. If you are executing a Spark program across multiple
machines, that you need to use a distributed file system (HDFS API
compatible) for reading and writing data. In your case, your setup is
across multiple machines. So what is probably happening is that the the RDD
data is being written in the worker machine's local directory (based on the
checkpoint path that has been provided), which cannot be read again as an
RDD. Hence checkpointing is failing.

TD


On Mon, Apr 7, 2014 at 6:05 PM, Paul Mogren <pmog...@commercehub.com> wrote:

> 1.:  I will paste the full content of the environment page of the example
> application running against the cluster at the end of this message.
> 2. and 3.:  Following #2 I was able to see that the count was incorrectly
> 0 when running against the cluster, and following #3 I was able to get the
> message:
> org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4] at count
> at <console>:15(0) has different number of partitions than original RDD
> MappedRDD[3] at textFile at <console>:12(2)
>
> I think I understand - state checkpoints and other file-exchange
> operations in Spark cluster require a distributed/shared filesystem, even
> with just a single-host cluster and the driver/shell on a second host. Is
> that correct?
>
> Thank you,
> Paul
>
>
>
> Stages
> Storage
> Environment
> Executors
> NetworkWordCumulativeCountUpdateStateByKey application UI
> Environment
> Runtime Information
>
> Name    Value
> Java Home       /usr/lib/jvm/jdk1.8.0/jre
> Java Version    1.8.0 (Oracle Corporation)
> Scala Home
> Scala Version   version 2.10.3
> Spark Properties
>
> Name    Value
> spark.app.name  NetworkWordCumulativeCountUpdateStateByKey
> spark.cleaner.ttl       3600
> spark.deploy.recoveryMode       ZOOKEEPER
> spark.deploy.zookeeper.url      pubsub01:2181
> spark.driver.host       10.10.41.67
> spark.driver.port       37360
> spark.fileserver.uri    http://10.10.41.67:40368
> spark.home      /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2
> spark.httpBroadcast.uri http://10.10.41.67:45440
> spark.jars
>  
> /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
> spark.master    spark://10.10.41.19:7077
> System Properties
>
> Name    Value
> awt.toolkit     sun.awt.X11.XToolkit
> file.encoding   ANSI_X3.4-1968
> file.encoding.pkg       sun.io
> file.separator  /
> java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
> java.awt.printerjob     sun.print.PSPrinterJob
> java.class.version      52.0
> java.endorsed.dirs      /usr/lib/jvm/jdk1.8.0/jre/lib/endorsed
> java.ext.dirs
> /usr/lib/jvm/jdk1.8.0/jre/lib/ext:/usr/java/packages/lib/ext
> java.home       /usr/lib/jvm/jdk1.8.0/jre
> java.io.tmpdir  /tmp
> java.library.path
> java.net.preferIPv4Stack        true
> java.runtime.name       Java(TM) SE Runtime Environment
> java.runtime.version    1.8.0-b132
> java.specification.name Java Platform API Specification
> java.specification.vendor       Oracle Corporation
> java.specification.version      1.8
> java.vendor     Oracle Corporation
> java.vendor.url http://java.oracle.com/
> java.vendor.url.bug     http://bugreport.sun.com/bugreport/
> java.version    1.8.0
> java.vm.info    mixed mode
> java.vm.name    Java HotSpot(TM) 64-Bit Server VM
> java.vm.specification.name      Java Virtual Machine Specification
> java.vm.specification.vendor    Oracle Corporation
> java.vm.specification.version   1.8
> java.vm.vendor  Oracle Corporation
> java.vm.version 25.0-b70
> line.separator
> log4j.configuration     conf/log4j.properties
> os.arch amd64
> os.name Linux
> os.version      3.5.0-23-generic
> path.separator  :
> sun.arch.data.model     64
> sun.boot.class.path
> /usr/lib/jvm/jdk1.8.0/jre/lib/resources.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/rt.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/sunrsasign.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jsse.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jce.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/charsets.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jfr.jar:/usr/lib/jvm/jdk1.8.0/jre/classes
> sun.boot.library.path   /usr/lib/jvm/jdk1.8.0/jre/lib/amd64
> sun.cpu.endian  little
> sun.cpu.isalist
> sun.io.serialization.extendedDebugInfo  true
> sun.io.unicode.encoding UnicodeLittle
> sun.java.command
>  org.apache.spark.streaming.examples.StatefulNetworkWordCount spark://
> 10.10.41.19:7077 localhost 9999
> sun.java.launcher       SUN_STANDARD
> sun.jnu.encoding        ANSI_X3.4-1968
> sun.management.compiler HotSpot 64-Bit Tiered Compilers
> sun.nio.ch.bugLevel
> sun.os.patch.level      unknown
> user.country    US
> user.dir        /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2
> user.home       /home/pmogren
> user.language   en
> user.name       pmogren
> user.timezone   America/New_York
> Classpath Entries
>
> Resource        Source
> /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
>     System Classpath
> /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/conf
>  System Classpath
> /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar
>        System Classpath
>
> http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jarAdded
>  By User
>
>
>
>
>
>
>
>
>
>
>
> From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
> Sent: Monday, April 07, 2014 7:54 PM
> To: user@spark.apache.org
> Subject: Re: CheckpointRDD has different number of partitions than
> original RDD
>
> Few things that would be helpful.
>
> 1. Environment settings - you can find them on the environment tab in the
> Spark application UI
> 2. Are you setting the HDFS configuration correctly in your Spark program?
> For example, can you write a HDFS file from a Spark program (say
> spark-shell) to your HDFS installation and read it back into Spark (i.e.,
> create a RDD)? You can test this by write an RDD as a text file from the
> shell, and then try to read it back from another shell.
> 3. If that works, then lets try explicitly checkpointing an RDD. To do
> this you can take any RDD and do the following.
>
> myRDD.checkpoint()
> myRDD.count()
>
> If there is some issue, then this should reproduce the above error.
>
> TD
>
> On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren <pmog...@commercehub.com>
> wrote:
> Hello, Spark community!  My name is Paul. I am a Spark newbie, evaluating
> version 0.9.0 without any Hadoop at all, and need some help. I run into the
> following error with the StatefulNetworkWordCount example (and similarly in
> my prototype app, when I use the updateStateByKey operation).  I get this
> when running against my small cluster, but not (so far) against local[2].
>
> 61904 [spark-akka.actor.default-dispatcher-2] ERROR
> org.apache.spark.streaming.scheduler.JobScheduler - Error running job
> streaming job 1396905956000 ms.0
> org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take
> at DStream.scala:586(0) has different number of partitions than original
> RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2)
>         at
> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99)
>         at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:855)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:870)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:884)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:844)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at scala.util.Try$.apply(Try.scala:161)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:744)
>
>
> Please let me know what other information would be helpful; I didn't find
> any question submission guidelines.
>
> Thanks,
> Paul
>
>

Reply via email to