[ 
https://issues.apache.org/jira/browse/SPARK-13377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15153426#comment-15153426
 ] 

Jey Kottalam commented on SPARK-13377:
--------------------------------------

Zhao and I looked into this this afternoon and believe this is a quirk of 
BinaryFilesRDD. The visible symptom is that when reading with sc.binaryFiles, 
the I/O load is not evenly distributed across the various replicas.

> binaryFileRDD preferredLocations issue
> --------------------------------------
>
>                 Key: SPARK-13377
>                 URL: https://issues.apache.org/jira/browse/SPARK-13377
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 1.6.0
>         Environment: EC2 cluster started with spark-ec2 script
>            Reporter: Zhao Zhang
>            Priority: Blocker
>
> I am using the sc.binaryFiles() interface to load a directory of small files 
> into Spark. Each input file has two replicas in HDFS, as shown below:
> {quote}
> root@ip-172-31-18-231 python]$ ~/ephemeral-hdfs/bin/hdfs fsck 
> /user/root/1.txt -files -blocks -locations
> Connecting to namenode via 
> http://ec2-54-183-233-19.us-west-1.compute.amazonaws.com:50070
> FSCK started by root (auth:SIMPLE) from /172.31.18.231 for path 
> /user/root/1.txt at Thu Feb 18 23:30:04 UTC 2016
> /user/root/1.txt 12 bytes, 1 block(s):  OK
> 0. BP-1035722345-172.31.18.231-1455830537596:blk_7363655817569260238_1002 
> len=12 repl=2 [172.31.18.204:50010, 172.31.30.137:50010]
> {quote}
> However, in Spark REPL, the preferredLocations() method only return one 
> address instead of two:
> {quote}
> scala> val r = sc.binaryFiles("1.txt")
> r: org.apache.spark.rdd.RDD[(String, 
> org.apache.spark.input.PortableDataStream)] = 1.txt BinaryFileRDD[1] at 
> binaryFiles at <console>:21
> scala> r.partitions.map(r.preferredLocations(_))
> res1: Array[Seq[String]] = 
> Array(WrappedArray(ip-172-31-18-204.us-west-1.compute.internal))
> {quote}
> We try read the file with sc.newAPIHadoopFile(), and this method works 
> correctly:
> {quote}
> scala> val r = sc.newAPIHadoopFile[LongWritable, Text, 
> TextInputFormat]("/user/root/1.txt", classOf[TextInputFormat], 
> classOf[LongWritable], classOf[Text])
> r: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, 
> org.apache.hadoop.io.Text)] = /user/root/1.txt NewHadoopRDD[0] at 
> newAPIHadoopFile at <console>:24
> scala> r.partitions.map(r.preferredLocations(_))
> res0: Array[Seq[String]] = 
> Array(WrappedArray(ip-172-31-18-204.us-west-1.compute.internal, 
> ip-172-31-30-137.us-west-1.compute.internal))
> {quote}
> We trace the source code down from sc.binaryFiles() to binaryFileRDD, but can 
> not find a quick fix. Please help accordingly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to