Hi Gourav,

The issue here is the location where you're trying to write/read from :
/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available
to all executors (and driver), and that is reason why you generally use
HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not
tries to pick up the data from a selected node, it rather tries to
write/read in parallel from the executor nodes. Also given its control
logic there is no way (read. you should not care) to know what executor is
doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
> Therefore I  do not have a HDFS.
>
>
> EXERCISE:
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
> SETTINGS:
> So after I have installed SPARK in two physical systems with the same:
> 1. SPARK version,
> 2. JAVA version,
> 3. PYTHON_PATH
> 4. SPARK_HOME
> 5. PYSPARK_PYTHON
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
> I am able to start:
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
> CODE:
> Then I run the following code:
>
> ======================================================
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
> from pyspark.sql import SparkSession
> spark = (SparkSession.builder
>         .master("spark://mastersystem.local:7077")
>         .appName("gouravtest")
>         .enableHiveSupport()
>         .getOrCreate())
> import pandas, numpy
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
> ======================================================
>
>
> ERROR I (in above code):
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> This line does not fail or report any error. But when I am looking at the
> stage in spark Application UI the error reported for one of the slave node
> which is not in the same system as the master node is mentioned below. The
> writing on the slave node which is in the same physical system as the
> Master happens correctly. (NOTE: slave node basically the worker and master
> node the driver)
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
> 0 (TID 41). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000006_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000006_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000028_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000028_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000021_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000021_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 
> 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000018_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000018_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000029_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000029_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000027_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000027_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000010_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000010_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000030_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000030_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000016_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000016_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000024_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000024_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_000023_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_000023_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 
> 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 
> 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 
> 2060 bytes result sent to driver
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes 
> in memory (estimated size 24.9 KB, free 365.9 MB)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 
> ms
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in 
> memory (estimated size 70.3 KB, free 365.9 MB)
> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
> java.io.FileNotFoundException: File 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
>  does not exist
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>       at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>       at 
> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>       at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>       at 
> org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>       at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>       at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>       at 
> scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>       at 
> scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>       at 
> scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>       at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>       at 
> scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>       at 
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>       at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>       at 
> scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>       at 
> scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>       at 
> scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>       at 
> scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>       at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>       at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>       at 
> scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>       at 
> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>
>
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
>
> ERROR II  (in above code):
> While trying to read the file there is now a distinct error thrown which
> mentions the same saying that the files do not exist.
>
> Also why is SPARK trying to search for the same files in both the systems?
> If the same path in two systems have different files should SPARK not
> combine and work on them?
>
>
>
> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
> I started spark using the same method but now using SPARK 1.5 and this
> does not give any error:
> ======================================================
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
>
> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
> sqlContext = pyspark.SQLContext(sc)
> import pandas, numpy
> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test3")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test3").count()
> ======================================================
>
> I will be sincerely obliged if someone could kindly help me out with this
> issue and point out my mistakes/ assumptions.
>
>
>
>
> Regards,
> Gourav Sengupta
>

Reply via email to