Yes, that is correct. If you are executing a Spark program across multiple machines, that you need to use a distributed file system (HDFS API compatible) for reading and writing data. In your case, your setup is across multiple machines. So what is probably happening is that the the RDD data is being written in the worker machine's local directory (based on the checkpoint path that has been provided), which cannot be read again as an RDD. Hence checkpointing is failing.
TD On Mon, Apr 7, 2014 at 6:05 PM, Paul Mogren <pmog...@commercehub.com> wrote: > 1.: I will paste the full content of the environment page of the example > application running against the cluster at the end of this message. > 2. and 3.: Following #2 I was able to see that the count was incorrectly > 0 when running against the cluster, and following #3 I was able to get the > message: > org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4] at count > at <console>:15(0) has different number of partitions than original RDD > MappedRDD[3] at textFile at <console>:12(2) > > I think I understand - state checkpoints and other file-exchange > operations in Spark cluster require a distributed/shared filesystem, even > with just a single-host cluster and the driver/shell on a second host. Is > that correct? > > Thank you, > Paul > > > > Stages > Storage > Environment > Executors > NetworkWordCumulativeCountUpdateStateByKey application UI > Environment > Runtime Information > > Name Value > Java Home /usr/lib/jvm/jdk1.8.0/jre > Java Version 1.8.0 (Oracle Corporation) > Scala Home > Scala Version version 2.10.3 > Spark Properties > > Name Value > spark.app.name NetworkWordCumulativeCountUpdateStateByKey > spark.cleaner.ttl 3600 > spark.deploy.recoveryMode ZOOKEEPER > spark.deploy.zookeeper.url pubsub01:2181 > spark.driver.host 10.10.41.67 > spark.driver.port 37360 > spark.fileserver.uri http://10.10.41.67:40368 > spark.home /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2 > spark.httpBroadcast.uri http://10.10.41.67:45440 > spark.jars > > /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar > spark.master spark://10.10.41.19:7077 > System Properties > > Name Value > awt.toolkit sun.awt.X11.XToolkit > file.encoding ANSI_X3.4-1968 > file.encoding.pkg sun.io > file.separator / > java.awt.graphicsenv sun.awt.X11GraphicsEnvironment > java.awt.printerjob sun.print.PSPrinterJob > java.class.version 52.0 > java.endorsed.dirs /usr/lib/jvm/jdk1.8.0/jre/lib/endorsed > java.ext.dirs > /usr/lib/jvm/jdk1.8.0/jre/lib/ext:/usr/java/packages/lib/ext > java.home /usr/lib/jvm/jdk1.8.0/jre > java.io.tmpdir /tmp > java.library.path > java.net.preferIPv4Stack true > java.runtime.name Java(TM) SE Runtime Environment > java.runtime.version 1.8.0-b132 > java.specification.name Java Platform API Specification > java.specification.vendor Oracle Corporation > java.specification.version 1.8 > java.vendor Oracle Corporation > java.vendor.url http://java.oracle.com/ > java.vendor.url.bug http://bugreport.sun.com/bugreport/ > java.version 1.8.0 > java.vm.info mixed mode > java.vm.name Java HotSpot(TM) 64-Bit Server VM > java.vm.specification.name Java Virtual Machine Specification > java.vm.specification.vendor Oracle Corporation > java.vm.specification.version 1.8 > java.vm.vendor Oracle Corporation > java.vm.version 25.0-b70 > line.separator > log4j.configuration conf/log4j.properties > os.arch amd64 > os.name Linux > os.version 3.5.0-23-generic > path.separator : > sun.arch.data.model 64 > sun.boot.class.path > /usr/lib/jvm/jdk1.8.0/jre/lib/resources.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/rt.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/sunrsasign.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jsse.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jce.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/charsets.jar:/usr/lib/jvm/jdk1.8.0/jre/lib/jfr.jar:/usr/lib/jvm/jdk1.8.0/jre/classes > sun.boot.library.path /usr/lib/jvm/jdk1.8.0/jre/lib/amd64 > sun.cpu.endian little > sun.cpu.isalist > sun.io.serialization.extendedDebugInfo true > sun.io.unicode.encoding UnicodeLittle > sun.java.command > org.apache.spark.streaming.examples.StatefulNetworkWordCount spark:// > 10.10.41.19:7077 localhost 9999 > sun.java.launcher SUN_STANDARD > sun.jnu.encoding ANSI_X3.4-1968 > sun.management.compiler HotSpot 64-Bit Tiered Compilers > sun.nio.ch.bugLevel > sun.os.patch.level unknown > user.country US > user.dir /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2 > user.home /home/pmogren > user.language en > user.name pmogren > user.timezone America/New_York > Classpath Entries > > Resource Source > /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar > System Classpath > /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/conf > System Classpath > /home/pmogren/streamproc/spark-0.9.0-incubating-bin-hadoop2/examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar > System Classpath > > http://10.10.41.67:40368/jars/spark-examples_2.10-assembly-0.9.0-incubating.jarAdded > By User > > > > > > > > > > > > From: Tathagata Das [mailto:tathagata.das1...@gmail.com] > Sent: Monday, April 07, 2014 7:54 PM > To: user@spark.apache.org > Subject: Re: CheckpointRDD has different number of partitions than > original RDD > > Few things that would be helpful. > > 1. Environment settings - you can find them on the environment tab in the > Spark application UI > 2. Are you setting the HDFS configuration correctly in your Spark program? > For example, can you write a HDFS file from a Spark program (say > spark-shell) to your HDFS installation and read it back into Spark (i.e., > create a RDD)? You can test this by write an RDD as a text file from the > shell, and then try to read it back from another shell. > 3. If that works, then lets try explicitly checkpointing an RDD. To do > this you can take any RDD and do the following. > > myRDD.checkpoint() > myRDD.count() > > If there is some issue, then this should reproduce the above error. > > TD > > On Mon, Apr 7, 2014 at 3:48 PM, Paul Mogren <pmog...@commercehub.com> > wrote: > Hello, Spark community! My name is Paul. I am a Spark newbie, evaluating > version 0.9.0 without any Hadoop at all, and need some help. I run into the > following error with the StatefulNetworkWordCount example (and similarly in > my prototype app, when I use the updateStateByKey operation). I get this > when running against my small cluster, but not (so far) against local[2]. > > 61904 [spark-akka.actor.default-dispatcher-2] ERROR > org.apache.spark.streaming.scheduler.JobScheduler - Error running job > streaming job 1396905956000 ms.0 > org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take > at DStream.scala:586(0) has different number of partitions than original > RDD MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2) > at > org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:855) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:870) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:884) > at org.apache.spark.rdd.RDD.take(RDD.scala:844) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:744) > > > Please let me know what other information would be helpful; I didn't find > any question submission guidelines. > > Thanks, > Paul > >