[ https://issues.apache.org/jira/browse/SPARK-13377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15153426#comment-15153426 ]
Jey Kottalam commented on SPARK-13377: -------------------------------------- Zhao and I looked into this this afternoon and believe this is a quirk of BinaryFilesRDD. The visible symptom is that when reading with sc.binaryFiles, the I/O load is not evenly distributed across the various replicas. > binaryFileRDD preferredLocations issue > -------------------------------------- > > Key: SPARK-13377 > URL: https://issues.apache.org/jira/browse/SPARK-13377 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 1.6.0 > Environment: EC2 cluster started with spark-ec2 script > Reporter: Zhao Zhang > Priority: Blocker > > I am using the sc.binaryFiles() interface to load a directory of small files > into Spark. Each input file has two replicas in HDFS, as shown below: > {quote} > root@ip-172-31-18-231 python]$ ~/ephemeral-hdfs/bin/hdfs fsck > /user/root/1.txt -files -blocks -locations > Connecting to namenode via > http://ec2-54-183-233-19.us-west-1.compute.amazonaws.com:50070 > FSCK started by root (auth:SIMPLE) from /172.31.18.231 for path > /user/root/1.txt at Thu Feb 18 23:30:04 UTC 2016 > /user/root/1.txt 12 bytes, 1 block(s): OK > 0. BP-1035722345-172.31.18.231-1455830537596:blk_7363655817569260238_1002 > len=12 repl=2 [172.31.18.204:50010, 172.31.30.137:50010] > {quote} > However, in Spark REPL, the preferredLocations() method only return one > address instead of two: > {quote} > scala> val r = sc.binaryFiles("1.txt") > r: org.apache.spark.rdd.RDD[(String, > org.apache.spark.input.PortableDataStream)] = 1.txt BinaryFileRDD[1] at > binaryFiles at <console>:21 > scala> r.partitions.map(r.preferredLocations(_)) > res1: Array[Seq[String]] = > Array(WrappedArray(ip-172-31-18-204.us-west-1.compute.internal)) > {quote} > We try read the file with sc.newAPIHadoopFile(), and this method works > correctly: > {quote} > scala> val r = sc.newAPIHadoopFile[LongWritable, Text, > TextInputFormat]("/user/root/1.txt", classOf[TextInputFormat], > classOf[LongWritable], classOf[Text]) > r: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, > org.apache.hadoop.io.Text)] = /user/root/1.txt NewHadoopRDD[0] at > newAPIHadoopFile at <console>:24 > scala> r.partitions.map(r.preferredLocations(_)) > res0: Array[Seq[String]] = > Array(WrappedArray(ip-172-31-18-204.us-west-1.compute.internal, > ip-172-31-30-137.us-west-1.compute.internal)) > {quote} > We trace the source code down from sc.binaryFiles() to binaryFileRDD, but can > not find a quick fix. Please help accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org