Ahhh---forgive my typo: what I mean is, (t2 - t1) >= (t_ser + t_deser + t_exec) is satisfied, empirically.
On 6/10/15, Mike Hynes <91m...@gmail.com> wrote: > Hi Imran, > > Thank you for your email. > > In examing the condition (t2 - t1) < (t_ser + t_deser + t_exec), I > have found it to be true, although I have not included the > t_{wait_for_read} in this, since it is---so far as I can tell---been > either zero or negligible compared to the task time. > > Thanks, > Mike > > On 6/8/15, Imran Rashid <iras...@cloudera.com> wrote: >> Hi Mike, >> >> all good questions, let me take a stab at answering them: >> >> 1. Event Logs + Stages: >> >> Its normal for stages to get skipped if they are shuffle map stages, >> which >> get read multiple times. Eg., here's a little example program I wrote >> earlier to demonstrate this: "d3" doesn't need to be re-shuffled since >> each >> time its read w/ the same partitioner. So skipping stages in this way is >> a >> good thing: >> >> val partitioner = new org.apache.spark.HashPartitioner(10) >> val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> >> x}.partitionBy(partitioner) >> (0 until 5).foreach { idx => >> val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> >> x}.partitionBy(partitioner) >> println(idx + " ---> " + otherData.join(d3).count()) >> } >> >> If you run this, f you look in the UI you'd see that all jobs except for >> the first one have one stage that is skipped. You will also see this in >> the log: >> >> 15/06/08 10:52:37 INFO DAGScheduler: Parents of final stage: List(Stage >> 12, >> Stage 13) >> >> 15/06/08 10:52:37 INFO DAGScheduler: Missing parents: List(Stage 13) >> >> Admittedly that is not very clear, but that is sort of indicating to you >> that the DAGScheduler first created stage 12 as a necessary step, and >> then >> later on changed its mind by realizing that everything it needed for >> stage >> 12 already existed, so there was nothing to do. >> >> >> 2. Extracting Event Log Information >> >> maybe you are interested in SparkListener ? Though unfortunately, I don't >> know of a good blog post describing it, hopefully the docs are clear ... >> >> 3. Time Metrics in Spark Event Log >> >> This is a great question. I *think* the only exception is that t_gc is >> really overlapped with t_exec. So I think you should really expect >> >> (t2 - t1) < (t_ser + t_deser + t_exec) >> >> I am not 100% sure about this, though. I'd be curious if that was >> constraint was ever violated. >> >> >> As for your question on shuffle read vs. shuffle write time -- I wouldn't >> necessarily expect the same stage to have times for both shuffle read & >> shuffle write -- in the simplest case, you'll have shuffle write times in >> one, and shuffle read times in the next one. But even taking that into >> account, there is a difference in the way they work & are measured. >> shuffle read operations are pipelined and the way we measure shuffle >> read, >> its just how much time is spent *waiting* for network transfer. It could >> be that there is no (measurable) wait time b/c the next blocks are >> fetched >> before they are needed. Shuffle writes occur in the normal task >> execution >> thread, though, so we (try to) measure all of it. >> >> >> On Sun, Jun 7, 2015 at 11:12 PM, Mike Hynes <91m...@gmail.com> wrote: >> >>> Hi Patrick and Akhil, >>> >>> Thank you both for your responses. This is a bit of an extended email, >>> but I'd like to: >>> 1. Answer your (Patrick) note about the "missing" stages since the IDs >>> do (briefly) appear in the event logs >>> 2. Ask for advice/experience with extracting information from the >>> event logs in a columnar, delimiter-separated format. >>> 3. Ask about the time metrics reported in the event logs; currently, >>> the elapsed time for a task does not equal the sum of the times for >>> its components >>> >>> 1. Event Logs + Stages: >>> ========================= >>> >>> As I said before, In the spark logs (the log4j configurable ones from >>> the driver), I only see references to some stages, where the stage IDs >>> are not arithmetically increasing. In the event logs, however, I will >>> see reference to *every* stage, although not all stages will have >>> tasks associated with them. >>> >>> For instance, to examine the actual stages that have tasks, you can >>> see missing stages: >>> # grep -E '"Event":"SparkListenerTaskEnd"' app.log \ >>> # | grep -Eo '"Stage ID":[[:digit:]]+' \ >>> # | sort -n|uniq | head -n 5 >>> "Stage ID":0 >>> "Stage ID":1 >>> "Stage ID":10 >>> "Stage ID":11 >>> "Stage ID":110 >>> >>> However, these "missing" stages *do* appear in the event logs as Stage >>> IDs in the jobs submitted, i.e: for >>> # grep -E '"Event":"SparkListenerJobStart"' app.log | grep -Eo 'Stage >>> IDs":\[.*\]' | head -n 5 >>> Stage IDs":[0,1,2] >>> Stage IDs":[5,3,4] >>> Stage IDs":[6,7,8] >>> Stage IDs":[9,10,11] >>> Stage IDs":[12,13,14] >>> >>> I do not know if this amounts to a bug, since I am not familiar with >>> the scheduler in detail. The stages have seemingly been created >>> somewhere in the DAG, but then have no associated tasks and never >>> appear again. >>> >>> 2. Extracting Event Log Information >>> ==================================== >>> Currently we are running scalability tests, and are finding very poor >>> scalability for certain block matrix algorithms. I would like to have >>> finer detail about the communication time and bandwidth when data is >>> transferred between nodes. >>> >>> I would really just like to have a file with nothing but task info in >>> a format such as: >>> timestamp (ms), task ID, hostname, execution time (ms), GC time (ms), >>> ... >>> 0010294, 1, slave-1, 503, 34, ... >>> 0010392, 2, slave-2, 543, 32, ... >>> and similarly for jobs/stages/rdd_memory/shuffle output/etc. >>> >>> I have extracted the relevant time fields from the spark event logs >>> with a sed script, but I wonder if there is an even more expedient >>> way. Unfortunately, I do not immediately see how to do this using the >>> $SPARK_HOME/conf/metrics.properties file and haven't come across a >>> blog/etc that describes this. Could anyone please comment on whether >>> or not a metrics configuation for this already exists? >>> >>> 3. Time Metrics in Spark Event Log >>> ================================== >>> I am confused about the times reported for tasks in the event log. >>> There are launch and finish timestamps given for each task (call them >>> t1 and t2, respectively), as well as GC time (t_gc), execution time >>> (t_exec), and serialization times (t_ser, t_deser). However the times >>> do not add up as I would have expected. I would imagine that the >>> elapsed time t2 - t1 would be slightly larger than the sum of the >>> component times. However, I can find many instances in the event logs >>> where: >>> (t2 - t1) < (t_gc + t_ser + t_deser + t_exec) >>> The difference can be 500 ms or more, which is not negligible for my >>> current execution times of ~5000 ms. I have attached a plot that >>> illustrates this. >>> >>> Regarding this, I'd like to ask: >>> 1. How exactly are these times are being measured? >>> 2. Should the sum of the component times equal the elapsed (clock) >>> time for the task? >>> 3. If not, which component(s) is(are) being excluded, and when do they >>> occur? >>> 4. There are occasionally reported measurements for Shuffle Write >>> time, but not shuffle read time. Is there a method to determine the >>> time required to shuffle data? Could this be done by look at delays >>> between the first task in a new stage and the last task in the >>> previous stage? >>> >>> Thank you very much for your time, >>> Mike >>> >>> >>> On 6/7/15, Patrick Wendell <pwend...@gmail.com> wrote: >>> > Hey Mike, >>> > >>> > Stage ID's are not guaranteed to be sequential because of the way the >>> > DAG scheduler works (only increasing). In some cases stage ID numbers >>> > are skipped when stages are generated. >>> > >>> > Any stage/ID that appears in the Spark UI is an actual stage, so if >>> > you see ID's in there, but they are not in the logs, then let us know >>> > (that would be a bug). >>> > >>> > - Patrick >>> > >>> > On Sun, Jun 7, 2015 at 9:06 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> > wrote: >>> >> Are you seeing the same behavior on the driver UI? (that running on >>> >> port >>> >> 4040), If you click on the stage id header you can sort the stages >>> >> based >>> >> on >>> >> IDs. >>> >> >>> >> Thanks >>> >> Best Regards >>> >> >>> >> On Fri, Jun 5, 2015 at 10:21 PM, Mike Hynes <91m...@gmail.com> wrote: >>> >>> >>> >>> Hi folks, >>> >>> >>> >>> When I look at the output logs for an iterative Spark program, I see >>> >>> that the stage IDs are not arithmetically numbered---that is, there >>> >>> are gaps between stages and I might find log information about Stage >>> >>> 0, 1,2, 5, but not 3 or 4. >>> >>> >>> >>> As an example, the output from the Spark logs below shows what I >>> >>> mean: >>> >>> >>> >>> # grep -rE "Stage [[:digit:]]+" spark_stderr | grep finished >>> >>> 12048:INFO:DAGScheduler:Stage 0 (mapPartitions at >>> >>> blockMap.scala:1444) >>> >>> finished in 7.820 s: >>> >>> 15994:INFO:DAGScheduler:Stage 1 (map at blockMap.scala:1810) >>> >>> finished >>> >>> in 3.874 s: >>> >>> 18291:INFO:DAGScheduler:Stage 2 (count at blockMap.scala:1179) >>> >>> finished in 2.237 s: >>> >>> 20121:INFO:DAGScheduler:Stage 4 (map at blockMap.scala:1817) >>> >>> finished >>> >>> in 1.749 s: >>> >>> 21254:INFO:DAGScheduler:Stage 5 (count at blockMap.scala:1180) >>> >>> finished in 1.082 s: >>> >>> 23422:INFO:DAGScheduler:Stage 7 (map at blockMap.scala:1810) >>> >>> finished >>> >>> in 2.078 s: >>> >>> 24773:INFO:DAGScheduler:Stage 8 (count at blockMap.scala:1188) >>> >>> finished in 1.317 s: >>> >>> 26455:INFO:DAGScheduler:Stage 10 (map at blockMap.scala:1817) >>> >>> finished >>> >>> in 1.638 s: >>> >>> 27228:INFO:DAGScheduler:Stage 11 (count at blockMap.scala:1189) >>> >>> finished in 0.732 s: >>> >>> 27494:INFO:DAGScheduler:Stage 14 (foreach at blockMap.scala:1302) >>> >>> finished in 0.192 s: >>> >>> 27709:INFO:DAGScheduler:Stage 17 (foreach at blockMap.scala:1302) >>> >>> finished in 0.170 s: >>> >>> 28018:INFO:DAGScheduler:Stage 20 (count at blockMap.scala:1201) >>> >>> finished in 0.270 s: >>> >>> 28611:INFO:DAGScheduler:Stage 23 (map at blockMap.scala:1355) >>> >>> finished >>> >>> in 0.455 s: >>> >>> 29598:INFO:DAGScheduler:Stage 24 (count at blockMap.scala:274) >>> >>> finished in 0.928 s: >>> >>> 29954:INFO:DAGScheduler:Stage 27 (map at blockMap.scala:1355) >>> >>> finished >>> >>> in 0.305 s: >>> >>> 30390:INFO:DAGScheduler:Stage 28 (count at blockMap.scala:275) >>> >>> finished in 0.391 s: >>> >>> 30452:INFO:DAGScheduler:Stage 32 (first at >>> >>> MatrixFactorizationModel.scala:60) finished in 0.028 s: >>> >>> 30506:INFO:DAGScheduler:Stage 36 (first at >>> >>> MatrixFactorizationModel.scala:60) finished in 0.023 s: >>> >>> >>> >>> Can anyone comment on this being normal behavior? Is it indicative >>> >>> of >>> >>> faults causing stages to be resubmitted? I also cannot find the >>> >>> missing stages in any stage's parent List(Stage x, Stage y, ...) >>> >>> >>> >>> Thanks, >>> >>> Mike >>> >>> >>> >>> >>> >>> On 6/1/15, Reynold Xin <r...@databricks.com> wrote: >>> >>> > Thanks, René. I actually added a warning to the new JDBC >>> reader/writer >>> >>> > interface for 1.4.0. >>> >>> > >>> >>> > Even with that, I think we should support throttling JDBC; >>> >>> > otherwise >>> >>> > it's >>> >>> > too convenient for our users to DOS their production database >>> servers! >>> >>> > >>> >>> > >>> >>> > /** >>> >>> > * Construct a [[DataFrame]] representing the database table >>> >>> > accessible >>> >>> > via JDBC URL >>> >>> > * url named table. Partitions of the table will be retrieved in >>> >>> > parallel >>> >>> > based on the parameters >>> >>> > * passed to this function. >>> >>> > * >>> >>> > * * Don't create too many partitions in parallel on a large >>> cluster; >>> >>> > otherwise Spark might crash* >>> >>> > * * your external database systems.* >>> >>> > * >>> >>> > * @param url JDBC database url of the form >>> >>> > `jdbc:subprotocol:subname` >>> >>> > * @param table Name of the table in the external database. >>> >>> > * @param columnName the name of a column of integral type that >>> will >>> >>> > be >>> >>> > used for partitioning. >>> >>> > * @param lowerBound the minimum value of `columnName` used to >>> >>> > decide >>> >>> > partition stride >>> >>> > * @param upperBound the maximum value of `columnName` used to >>> >>> > decide >>> >>> > partition stride >>> >>> > * @param numPartitions the number of partitions. the range >>> >>> > `minValue`-`maxValue` will be split >>> >>> > * evenly into this many partitions >>> >>> > * @param connectionProperties JDBC database connection >>> >>> > arguments, >>> a >>> >>> > list >>> >>> > of arbitrary string >>> >>> > * tag/value. Normally at least a >>> "user" >>> >>> > and >>> >>> > "password" property >>> >>> > * should be included. >>> >>> > * >>> >>> > * @since 1.4.0 >>> >>> > */ >>> >>> > >>> >>> > >>> >>> > On Mon, Jun 1, 2015 at 1:54 AM, René Treffer <rtref...@gmail.com> >>> >>> > wrote: >>> >>> > >>> >>> >> Hi, >>> >>> >> >>> >>> >> I'm using sqlContext.jdbc(uri, table, where).map(_ => >>> >>> >> 1).aggregate(0)(_+_,_+_) on an interactive shell (where "where" >>> >>> >> is >>> an >>> >>> >> Array[String] of 32 to 48 elements). (The code is tailored to >>> >>> >> your >>> >>> >> db, >>> >>> >> specifically through the where conditions, I'd have otherwise >>> >>> >> post >>> >>> >> it) >>> >>> >> That should be the DataFrame API, but I'm just trying to load >>> >>> >> everything >>> >>> >> and discard it as soon as possible :-) >>> >>> >> >>> >>> >> (1) Never do a silent drop of the values by default: it kills >>> >>> >> confidence. >>> >>> >> An option sounds reasonable. Some sort of insight / log would be >>> >>> >> great. >>> >>> >> (How many columns of what type were truncated? why?) >>> >>> >> Note that I could declare the field as string via JdbcDialects >>> (thank >>> >>> >> you >>> >>> >> guys for merging that :-) ). >>> >>> >> I have quite bad experiences with silent drops / truncates of >>> columns >>> >>> >> and >>> >>> >> thus _like_ the strict way of spark. It causes trouble but >>> >>> >> noticing >>> >>> >> later >>> >>> >> that your data was corrupted during conversion is even worse. >>> >>> >> >>> >>> >> (2) SPARK-8004 https://issues.apache.org/jira/browse/SPARK-8004 >>> >>> >> >>> >>> >> (3) One option would be to make it safe to use, the other option >>> >>> >> would >>> >>> >> be >>> >>> >> to document the behavior (s.th. like "WARNING: this method tries >>> >>> >> to >>> >>> >> load >>> >>> >> as many partitions as possible, make sure your database can >>> >>> >> handle >>> >>> >> the >>> >>> >> load >>> >>> >> or load them in chunks and use union"). SPARK-8008 >>> >>> >> https://issues.apache.org/jira/browse/SPARK-8008 >>> >>> >> >>> >>> >> Regards, >>> >>> >> Rene Treffer >>> >>> >> >>> >>> > >>> >>> >>> >>> >>> >>> -- >>> >>> Thanks, >>> >>> Mike >>> >>> >>> >>> --------------------------------------------------------------------- >>> >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >>> >>> >> >>> > >>> >>> >>> -- >>> Thanks, >>> Mike >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>> For additional commands, e-mail: dev-h...@spark.apache.org >>> >> > > > -- > Thanks, > Mike > -- Thanks, Mike --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org