[
https://issues.apache.org/jira/browse/SPARK-5674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380978#comment-14380978
]
Josh Rosen commented on SPARK-5674:
-----------------------------------
It looks like one of the key parts of this patch is the addition of a dummy
task / DAGScheduler for being able to see how a job would be broken into stages
without having to run it. I can see how this is a useful way to hack around
the fact that we don't really have an abstraction for describing a "deferred
action" (so there's no way to talk about the job / plan that would be produced
by, say, {{treeReduce}} as a first-class construct).
There are a couple of risks / subtleties of this simulated execution that worry
me slightly, though. For actions or library calls that trigger multiple jobs,
the fact that we don't actually run the real job might lead to confusing
exceptions / errors or cause Spark to print a plan that doesn't represent what
would actually happen during real execution. For example, consider a call like
{{take()}}. From a user's perspective, this is just a single action, but it
actually triggers multiple jobs. The number of jobs run in {{take()}} depends
on the results of earlier jobs, so disabling execution will either cause
exceptions when control returns from SparkContext back to the driver code or
will lead to different behavior than the real job would exhibit (in take's
case, the simulated plan might include running jobs over all partitions in
multiple jobs, even though the real execution might run a single job). This
could also be a problem for libraries like MLLib that run multiple jobs under
the hood but expose higher-level abstractions to users.
We could certainly support executionless analysis of the stage graph for a
single job, though; this might be doable without needing to create a new
DAGScheduler, too (this would require some refactoring).
> Spark Job Explain Plan Proof of Concept
> ---------------------------------------
>
> Key: SPARK-5674
> URL: https://issues.apache.org/jira/browse/SPARK-5674
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Reporter: Kostas Sakellis
>
> This is just a prototype of creating an explain plan for a job. Code can be
> found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc
> The code was written very quickly and so doesn't have any comments, tests and
> is probably buggy - hence it being a proof of concept.
> *How to Use*
> # {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain
> plain and print it in the logs
> # {code}sc.enableExecution <=> sc.disableExecution{code} This will disable
> executing of the job.
> Using these two knobs a user can choose to print the explain plan and/or
> disable the running of the job if they only want to see the plan.
> *Implementation*
> This is only a prototype and it is by no means production ready. The code is
> pretty hacky in places and a few shortcuts were made just to get the
> prototype working.
> The most interesting part of this commit is in the ExecutionPlanner.scala
> class. This class creates its own private instance of the DAGScheduler and
> passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the
> created TaskSets from the DAGScheduler and records the stages -> tasksets.
> The NoopTaskScheduler also creates fake CompletionsEvents and sends them to
> the DAGScheduler to move the scheduling along. It is done this way so that we
> can use the DAGScheduler unmodified thus reducing code divergence.
> The rest of the code is about processing the information produced by the
> ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as
> a pretty ascii drawing. For drawing the DAG,
> https://github.com/mdr/ascii-graphs is used. This was just easier again to
> prototype.
> *How is this different than RDD#toDebugString?*
> The execution planner runs the job through the entire DAGScheduler so we can
> collect some metrics that are not presently available in the debugString. For
> example, we can report the binary size of the task which might be important
> if the closures are referencing large object.
> In addition, because we execute the scheduler code from an action, we can get
> a more accurate picture of where the stage boundaries and dependencies. An
> action such ask treeReduce will generate a number of stages that you can't
> get just by doing .toDebugString on the rdd.
> *Limitations of this Implementation*
> Because this is a prototype there are is a lot of lame stuff in this commit.
> # All of the code in SparkContext in particular sucks. This adds some code in
> the runJob() call and when it gets the plan it just writes it to the INFO
> log. We need to find a better way of exposing the plan to the caller so that
> they can print it, analyze it etc. Maybe we can use implicits or something?
> Not sure how best to do this yet.
> # Some of the actions will return through exceptions because we are basically
> faking a runJob(). If you want ot try this, it is best to just use count()
> instead of say collect(). This will get fixed when we fix 1)
> # Because the ExplainPlanner creates its own DAGScheduler, there currently is
> no way to map the real stages to the "explainPlan" stages. So if a user turns
> on explain plan, and doesn't disable execution, we can't automatically add
> more metrics to the explain plan as they become available. The stageId in the
> plan and the stageId in the real scheduler will be different. This is
> important for when we add it to the webUI and users can track progress on the
> DAG
> # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure
> if we want to depend on that project.
> *Next Steps*
> # It would be good to get a few people to take a look at the code
> specifically at how the plan gets generated. Clone the package and give it a
> try with some of your jobs
> # If the approach looks okay overall, I can put together a mini design doc
> and add some answers to the above limitations of this approach.
> #Feedback most welcome.
> *Example Code:*
> {code}
> sc.explainOn
> sc.disableExecution
> val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key))
> val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key))
> rdd.join(rdd2)
> .count()
> {code}
> *Example Output:*
> {noformat}
> EXPLAIN PLAN:
> +---------------+ +---------------+
> | | | |
> |Stage: 0 @ map | |Stage: 1 @ map |
> | Tasks: 4 | | Tasks: 2 |
> | | | |
> +---------------+ +---------------+
> | |
> v v
> +-----------------+
> | |
> |Stage: 2 @ count |
> | Tasks: 4 |
> | |
> +-----------------+
> STAGE DETAILS:
> --------------
> Stage: 0
> Callsite: map at <console>:12
> ShuffleMapTask
> PartitionId: 0 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 1 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 2 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 3 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> RDD Chain:
> +---------------------+
> | |
> |RDD: 0 @ parallelize |
> |ParallelCollectionRDD|
> | |
> +---------------------+
> |
> v
> +----------------+
> | |
> | RDD: 1 @ map |
> |MapPartitionsRDD|
> | |
> +----------------+
> Stage: 1
> Callsite: map at <console>:12
> ShuffleMapTask
> PartitionId: 0 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 1 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> RDD Chain:
> +---------------------+
> | |
> |RDD: 2 @ parallelize |
> |ParallelCollectionRDD|
> | |
> +---------------------+
> |
> v
> +----------------+
> | |
> | RDD: 3 @ map |
> |MapPartitionsRDD|
> | |
> +----------------+
> Stage: 2
> Callsite: count at <console>:19
> ResultTask
> PartitionId: 0 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 1 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 2 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 3 Type: CoGroupPartition
> Binary Size: 2.4 KB
> RDD Chain:
> +--------------+
> | |
> |RDD: 4 @ join |
> | CoGroupedRDD |
> | |
> +--------------+
> |
> v
> +----------------+
> | |
> | RDD: 5 @ join |
> |MapPartitionsRDD|
> | |
> +----------------+
> |
> v
> +----------------+
> | |
> | RDD: 6 @ join |
> |MapPartitionsRDD|
> | |
> +----------------+
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]