[
https://issues.apache.org/jira/browse/SPARK-20031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932516#comment-15932516
]
Rakesh Kumar Dash commented on SPARK-20031:
-------------------------------------------
Similar behaviour is observed for ("sc.wholeTextFiles" or "sc.binaryFiles" or
"sc.textFiles") + ("toDebugString" or "reduceByKey") also!!!
> sc.wholeTextFiles + toDebugString takes long time even before action is
> performed
> ---------------------------------------------------------------------------------
>
> Key: SPARK-20031
> URL: https://issues.apache.org/jira/browse/SPARK-20031
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, Spark Core
> Affects Versions: 1.6.0, 1.6.2, 1.6.3
> Environment: We are using spark standalone environment with two
> workers. The underlying file system is NFS.
> Reporter: Rakesh Kumar Dash
> Priority: Critical
>
> Below is a simple code segment.
> inputForCust is 14762 files totalling 57M only with an average file size as
> 0.5K. The files are loaded from local filesystem mounted through NFS. In our
> production environment, we have many files and toDebugString takes 2 hours!!!
> val inputCustFiles = sc.wholeTextFiles(inputForCust,
> jobArgs.minPartitions)
> println("This prints immediately")
> inputCustFiles.toDebugString
> println("This prints after 20 mins")
> inputCustFiles.count
> println("This prints after 10 mins")
> Note: We were having some complex transformations after the wholeTextFile and
> the time was taken in reduceByKey!!! I have simplified the code to reproduce
> the problem only.
> **My question is, Why inputCustFiles.toDebugString is taking so much time?**
> If, inputCustFiles.count takes time, I can be assured that it is going to
> take advantage of the cluster processing power. But
> inputCustFiles.toDebugString is blocking the driver!!!
> In the duration of 20 min, I see no activity in the spark UI.
> If I enable trace level logging, I see below lines
> [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting local block
> broadcast_1
> [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Level for block
> broadcast_1 is StorageLevel(true, true, false, true, 1)
> [error] [17/03/17 23:23:27] [DEBUG] BlockManager: Getting block
> broadcast_1 from memory
> [error] [17/03/17 23:23:43] [TRACE] HeartbeatReceiver: Checking for hosts
> with no recent heartbeats in HeartbeatReceiver.
> [error] [17/03/17 23:24:43] [TRACE] HeartbeatReceiver: Checking for hosts
> with no recent heartbeats in HeartbeatReceiver.
>
> :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
> **Any idea, if I am doing anything wrong or is this a limitation/bug/design
> of spark?**
> Note:
> - We are using 1.6.2.
> - The time takes for toDebugString increases if the number of input file
> changes!!!
> Below is the stack trace at the time driver is blocked
> java.io.FileInputStream.readBytes(Native Method)
> java.io.FileInputStream.read(FileInputStream.java:255)
> java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
> java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> java.io.BufferedReader.fill(BufferedReader.java:161)
> java.io.BufferedReader.read1(BufferedReader.java:212)
> java.io.BufferedReader.read(BufferedReader.java:286)
>
> org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:602)
> org.apache.hadoop.util.Shell.runCommand(Shell.java:446)
> org.apache.hadoop.util.Shell.run(Shell.java:379)
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
> org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
> org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
> org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
>
> org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:567)
>
> org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSystem.java:542)
> org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:42)
> org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1815)
> org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
>
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:267)
>
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>
> org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:49)
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> scala.Option.getOrElse(Option.scala:121)
> org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> scala.Option.getOrElse(Option.scala:121)
> org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> org.apache.spark.rdd.RDD.firstDebugString$1(RDD.scala:1747)
> org.apache.spark.rdd.RDD.toDebugString(RDD.scala:1781)
> oculus.storeonce.spark.Test$.main(Test.scala:11)
> oculus.storeonce.spark.Test.main(Test.scala)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]