[
https://issues.apache.org/jira/browse/SPARK-5674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-5674:
--------------------------------
Labels: bulk-closed (was: )
> Spark Job Explain Plan Proof of Concept
> ---------------------------------------
>
> Key: SPARK-5674
> URL: https://issues.apache.org/jira/browse/SPARK-5674
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Reporter: Kostas Sakellis
> Priority: Major
> Labels: bulk-closed
>
> This is just a prototype of creating an explain plan for a job. Code can be
> found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc
> The code was written very quickly and so doesn't have any comments, tests and
> is probably buggy - hence it being a proof of concept.
> *How to Use*
> # {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain
> plain and print it in the logs
> # {code}sc.enableExecution <=> sc.disableExecution{code} This will disable
> executing of the job.
> Using these two knobs a user can choose to print the explain plan and/or
> disable the running of the job if they only want to see the plan.
> *Implementation*
> This is only a prototype and it is by no means production ready. The code is
> pretty hacky in places and a few shortcuts were made just to get the
> prototype working.
> The most interesting part of this commit is in the ExecutionPlanner.scala
> class. This class creates its own private instance of the DAGScheduler and
> passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the
> created TaskSets from the DAGScheduler and records the stages -> tasksets.
> The NoopTaskScheduler also creates fake CompletionsEvents and sends them to
> the DAGScheduler to move the scheduling along. It is done this way so that we
> can use the DAGScheduler unmodified thus reducing code divergence.
> The rest of the code is about processing the information produced by the
> ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as
> a pretty ascii drawing. For drawing the DAG,
> https://github.com/mdr/ascii-graphs is used. This was just easier again to
> prototype.
> *How is this different than RDD#toDebugString?*
> The execution planner runs the job through the entire DAGScheduler so we can
> collect some metrics that are not presently available in the debugString. For
> example, we can report the binary size of the task which might be important
> if the closures are referencing large object.
> In addition, because we execute the scheduler code from an action, we can get
> a more accurate picture of where the stage boundaries and dependencies. An
> action such ask treeReduce will generate a number of stages that you can't
> get just by doing .toDebugString on the rdd.
> *Limitations of this Implementation*
> Because this is a prototype there are is a lot of lame stuff in this commit.
> # All of the code in SparkContext in particular sucks. This adds some code in
> the runJob() call and when it gets the plan it just writes it to the INFO
> log. We need to find a better way of exposing the plan to the caller so that
> they can print it, analyze it etc. Maybe we can use implicits or something?
> Not sure how best to do this yet.
> # Some of the actions will return through exceptions because we are basically
> faking a runJob(). If you want ot try this, it is best to just use count()
> instead of say collect(). This will get fixed when we fix 1)
> # Because the ExplainPlanner creates its own DAGScheduler, there currently is
> no way to map the real stages to the "explainPlan" stages. So if a user turns
> on explain plan, and doesn't disable execution, we can't automatically add
> more metrics to the explain plan as they become available. The stageId in the
> plan and the stageId in the real scheduler will be different. This is
> important for when we add it to the webUI and users can track progress on the
> DAG
> # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure
> if we want to depend on that project.
> *Next Steps*
> # It would be good to get a few people to take a look at the code
> specifically at how the plan gets generated. Clone the package and give it a
> try with some of your jobs
> # If the approach looks okay overall, I can put together a mini design doc
> and add some answers to the above limitations of this approach.
> #Feedback most welcome.
> *Example Code:*
> {code}
> sc.explainOn
> sc.disableExecution
> val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key))
> val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key))
> rdd.join(rdd2)
> .count()
> {code}
> *Example Output:*
> {noformat}
> EXPLAIN PLAN:
> +---------------+ +---------------+
> | | | |
> |Stage: 0 @ map | |Stage: 1 @ map |
> | Tasks: 4 | | Tasks: 2 |
> | | | |
> +---------------+ +---------------+
> | |
> v v
> +-----------------+
> | |
> |Stage: 2 @ count |
> | Tasks: 4 |
> | |
> +-----------------+
> STAGE DETAILS:
> --------------
> Stage: 0
> Callsite: map at <console>:12
> ShuffleMapTask
> PartitionId: 0 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 1 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 2 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 3 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> RDD Chain:
> +---------------------+
> | |
> |RDD: 0 @ parallelize |
> |ParallelCollectionRDD|
> | |
> +---------------------+
> |
> v
> +----------------+
> | |
> | RDD: 1 @ map |
> |MapPartitionsRDD|
> | |
> +----------------+
> Stage: 1
> Callsite: map at <console>:12
> ShuffleMapTask
> PartitionId: 0 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> ShuffleMapTask
> PartitionId: 1 Type: ParallelCollectionPartition
> Binary Size: 2.2 KB
> RDD Chain:
> +---------------------+
> | |
> |RDD: 2 @ parallelize |
> |ParallelCollectionRDD|
> | |
> +---------------------+
> |
> v
> +----------------+
> | |
> | RDD: 3 @ map |
> |MapPartitionsRDD|
> | |
> +----------------+
> Stage: 2
> Callsite: count at <console>:19
> ResultTask
> PartitionId: 0 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 1 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 2 Type: CoGroupPartition
> Binary Size: 2.4 KB
> ResultTask
> PartitionId: 3 Type: CoGroupPartition
> Binary Size: 2.4 KB
> RDD Chain:
> +--------------+
> | |
> |RDD: 4 @ join |
> | CoGroupedRDD |
> | |
> +--------------+
> |
> v
> +----------------+
> | |
> | RDD: 5 @ join |
> |MapPartitionsRDD|
> | |
> +----------------+
> |
> v
> +----------------+
> | |
> | RDD: 6 @ join |
> |MapPartitionsRDD|
> | |
> +----------------+
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]