[ 
https://issues.apache.org/jira/browse/SPARK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718527#comment-16718527
 ] 

ASF GitHub Bot commented on SPARK-23674:
----------------------------------------

HyukjinKwon opened a new pull request #23263: [SPARK-23674][ML] Adds Spark ML 
Events
URL: https://github.com/apache/spark/pull/23263
 
 
   ## What changes were proposed in this pull request?
   
   This PR proposes to add ML events so that other developers can track and add 
some actions for them.
   
   ## Introduction
   
   ML events (like SQL events) can be quite useful when people want to track 
and make some actions for corresponding ML operations. For instance, I have 
been working on integrating 
   Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). 
With some custom changes with this PR, I can visualise ML pipeline as below:
   
   
![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png)
   
   Another good thing that might have to be considered is, that we can interact 
this with other SQL/Streaming events. For instance, where the input `Dataset` 
is originated. For instance, with current Apache Spark, I can visualise SQL 
operations as below:
   
   ![screen shot 2018-12-10 at 9 41 36 
am](https://user-images.githubusercontent.com/6477701/49706269-d9bdfe00-fc5f-11e8-943a-3309d1856ba5.png)
   
   I think we can combine those existing lineages together to easily understand 
where the data comes and goes. Currently, ML side is a hole so the lineages 
can't be connected for the current Apache Spark ..
   
   To add up, I think it's not to mention how useful it is to track the 
SQL/Streaming operations. Likewise, I would like to propose ML events as well 
(as lowest stability `@Unstable` APIs for now - no guarantee about stability).
   
   ## Implementation Details
   
   ### Sends event (but not expose ML specific listener)
   
   **`mllib/src/main/scala/org/apache/spark/ml/events.scala`**
   
   ```scala
   @Unstable
   case class ...StartEvent(caller, input)
   @Unstable
   case class ...EndEvent(caller, output)
   
   object MLEvents {
     // Wrappers to send events:
     // def with...Event(body) = {
     //   body()
     //   SparkContext.getOrCreate().listenerBus.post(event)
     // }
   }
   ```
   
   This way mimics both:
   
   **1. Catalog events (see 
`org/apache/spark/sql/catalyst/catalog/events.scala`)**
   
   - This allows a Catalog specific listener to be added 
`ExternalCatalogEventListener` 
   
   - It's implemented in a way of wrapping whole `ExternalCatalog` named 
`ExternalCatalogWithListener`
   which delegates the operations to `ExternalCatalog`
   
   This is not quite possible in this case because most of instances (like 
`Pipeline`) will be directly created in most of cases. We might be able to do 
that via extending `ListenerBus` for all possible instances but IMHO it's too 
invasive. Also, exposing another ML specific listener sounds a bit too much at 
this stage. Therefore, I simply borrowed file name and structures here
   
   **2. SQL execution events (see 
`org/apache/spark/sql/execution/SQLExecution.scala`)**
   
   - Add an object that wraps a body to send events
   
   Current apporach is rather close to this. It has a `with...` wrapper to send 
events. I borrowed this approach to be consistent.
   
   
   ### Add `...Impl` methods to wrap each to send events
   
   **`mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala`**
   
   ```diff
   - def save(...) = { saveImpl(...) }
   + def save(...) = MLEvents.withSaveInstanceEvent { saveImpl(...) }
     def saveImpl(...): Unit = ...
   ```
   
     Note that `saveImpl` was already implemented unlike other instances below.
   
   
   ```diff
   - def load(...): T
   + def load(...): T = MLEvents.withLoadInstanceEvent { loadImple(...) }
   + def loadImpl(...): T
   ```
   
   **`mllib/src/main/scala/org/apache/spark/ml/Estimator.scala`**
   
   ```diff
   - def fit(...): Model
   + def fit(...): Model = MLEvents.withFitEvent { fitImpl(...) }
   + def fitImpl(...): Model
   ```
   
   **`mllib/src/main/scala/org/apache/spark/ml/Transformer.scala`**
   
   ```diff
   - def transform(...): DataFrame
   + def transform(...): DataFrame = MLEvents.withTransformEvent { 
transformImpl(...) }
   + def transformImpl(...): DataFrame
   ```
   
   This approach follows the existing way as below in ML:
   
   **1. `transform` and `transformImpl`**
   
   
https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala#L202-L213
   
   
https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L191-L196
   
   
https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala#L1037-L1042
   
   **2. `save` and `saveImpl`**
   
   
https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L166-L176
   
   Inherited ones are intentionally omitted here for simplicity. They are 
inherited and implemented at multiple places.
   
   ## Usage
   
   It needs a custom implementation for a query listener. For instance,
   
   with the custom listener below:
   
   ```scala
   class CustomMLListener extends SparkListener
     def onOtherEvents(e) = e match {
       case e: MLEvent => // do something
       case _ => // pass
     }
   }
   ```
   
   There are two (existing) ways to use this.
   
   ```scala
   spark.sparkContext.addSparkListener(new CustomMLListener)
   ```
   
   ```bash
   spark-submit ...\
     --conf spark.extraListeners=CustomMLListener\
     ...
   ```
   
   It's also similar with other existing implementation in SQL side.
   
   ## Target users
   
   1. I think someone in general would likely utilise this feature like other 
event listeners. At least, I can see some interests going on outside.
   
       - SQL Listener
         - 
https://stackoverflow.com/questions/46409339/spark-listener-to-an-sql-query
         - 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-Custom-Query-Execution-listener-via-conf-properties-td30979.html
   
       - Streaming Query Listener
         - https://jhui.github.io/2017/01/15/Apache-Spark-Streaming/
         -  
http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Watermark-td25413.html#a25416
   
   2. Someone would likely run this via Atlas. The plugin mirror intentionally 
is exposed at 
[spark-atlas-connector](https://github.com/hortonworks-spark/spark-atlas-connector)
 so that anyone could do something about lineage and governance in Atlas, yes, 
as you said. Yes, I'm trying to show integrated lineages in Apache Spark but 
this is a missing hole. There had to be this change for it.
   
   
   ## Backward Compatibility
   
   _This keeps both source and binary backward compatibility_. I was thinking 
enforcing `...Impl` by leaving it abstract methods to force to implement but 
just decided to leave a body that throws `UnsupportedOperationException` so 
that we can keep full source and binary compatibilities.
   
   - For user-faced API perspective, _there's no difference_. `...Impl` methods 
are protected and not visible to end users.
   
   - For developer API perspective, if some developers want to `...` methods 
instead of `...Impl`, that's still fine. It only does not handle events. If 
developers want to handle events from their custom implementation, they should 
implement `...Impl`. Of course, it is encouraged to implement `...Impl`
   
     For instance,
   
     ```scala
     class Pipeline extends Estimator[PipelineModel] {
       def fit(dataset: Dataset[_]): PipelineModel = {
         ...
       }
     }
     ```
     still works fine without any behaviour changes.
   
     If developers want their pipeline to emit events, they should change:
   
     ```diff
       class Pipeline extends Estimator[PipelineModel] {
     -   def fit(dataset: Dataset[_]): PipelineModel = {
     +   def fitImpl(dataset: Dataset[_]): PipelineModel = {
           ...
         }
       }
     ```
   
     _^ this is only API change this PR causes._
   
   
   ## How was this patch tested?
   
   Manually tested and unit tests were added.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Spark ML Listener for Tracking ML Pipeline Status
> -----------------------------------------------------
>
>                 Key: SPARK-23674
>                 URL: https://issues.apache.org/jira/browse/SPARK-23674
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 2.3.0
>            Reporter: Mingjie Tang
>            Priority: Major
>
> Currently, Spark provides status monitoring for different components of 
> Spark, like spark history server, streaming listener, sql listener and etc. 
> The use case would be (1) front UI to track the status of training coverage 
> rate during iteration, then DS can understand how the job converge when 
> training, like K-means, Logistic and other linear regression model.  (2) 
> tracking the data lineage for the input and output of training data.  
> In this proposal, we hope to provide Spark ML pipeline listener to track the 
> status of Spark ML pipeline status includes: 
>  # ML pipeline create and saved 
>  # ML pipeline model created, saved and load  
>  # ML model training status monitoring  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to