[ 
https://issues.apache.org/jira/browse/SPARK-20031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932516#comment-15932516
 ] 

Rakesh Kumar Dash commented on SPARK-20031:
-------------------------------------------

Similar behaviour is observed for ("sc.wholeTextFiles" or "sc.binaryFiles" or 
"sc.textFiles") + ("toDebugString" or "reduceByKey") also!!!

> sc.wholeTextFiles + toDebugString takes long time even before action is 
> performed
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20031
>                 URL: https://issues.apache.org/jira/browse/SPARK-20031
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 1.6.0, 1.6.2, 1.6.3
>         Environment: We are using spark standalone environment with two 
> workers. The underlying file system is NFS.
>            Reporter: Rakesh Kumar Dash
>            Priority: Critical
>
> Below is a simple code segment.
> inputForCust is 14762 files totalling 57M only with an average file size as 
> 0.5K. The files are loaded from local filesystem mounted through NFS. In our 
> production environment, we have many files and toDebugString takes 2 hours!!!
>     val inputCustFiles = sc.wholeTextFiles(inputForCust, 
> jobArgs.minPartitions)
>     println("This prints immediately")
>     inputCustFiles.toDebugString
>     println("This prints after 20 mins")
>     inputCustFiles.count
>     println("This prints after 10 mins")
> Note: We were having some complex transformations after the wholeTextFile and 
> the time was taken in reduceByKey!!! I have simplified the code to reproduce 
> the problem only.
> **My question is, Why inputCustFiles.toDebugString is taking so much time?**
> If, inputCustFiles.count takes time, I can be assured that it is going to 
> take advantage of the cluster processing power. But 
> inputCustFiles.toDebugString is blocking the driver!!!
> In the duration of 20 min, I see no activity in the spark UI.
> If I enable trace level logging, I see below lines
>     [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting local block 
> broadcast_1
>     [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Level for block 
> broadcast_1 is StorageLevel(true, true, false, true, 1)
>     [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting block 
> broadcast_1 from memory
>     [error] [17/03/17 23:23:43] [TRACE] HeartbeatReceiver: Checking for hosts 
> with no recent heartbeats in HeartbeatReceiver.
>     [error] [17/03/17 23:24:43] [TRACE] HeartbeatReceiver: Checking for hosts 
> with no recent heartbeats in HeartbeatReceiver.
>     
> :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
> **Any idea, if I am doing anything wrong or is this a limitation/bug/design 
> of spark?**
> Note:
> - We are using 1.6.2.
> - The time takes for toDebugString increases if the number of input file 
> changes!!!
> Below is the stack trace at the time driver is blocked
>     java.io.FileInputStream.readBytes(Native Method)
>     java.io.FileInputStream.read(FileInputStream.java:255)
>     java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
>     java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>     sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
>     sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
>     sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>     java.io.InputStreamReader.read(InputStreamReader.java:184)
>     java.io.BufferedReader.fill(BufferedReader.java:161)
>     java.io.BufferedReader.read1(BufferedReader.java:212)
>     java.io.BufferedReader.read(BufferedReader.java:286)
>     
> org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:602)
>     org.apache.hadoop.util.Shell.runCommand(Shell.java:446)
>     org.apache.hadoop.util.Shell.run(Shell.java:379)
>     org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
>     org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
>     org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
>     org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
>     
> org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:567)
>     
> org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSystem.java:542)
>     org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:42)
>     org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1815)
>     org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
>     
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:267)
>     
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>     
> org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:49)
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>     scala.Option.getOrElse(Option.scala:121)
>     org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>     
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>     scala.Option.getOrElse(Option.scala:121)
>     org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>     org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1747)
>     org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1781)
>     oculus.storeonce.spark.Test$.main(Test.scala:11)
>     oculus.storeonce.spark.Test.main(Test.scala)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to