[ https://issues.apache.org/jira/browse/SPARK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718527#comment-16718527 ]
ASF GitHub Bot commented on SPARK-23674: ---------------------------------------- HyukjinKwon opened a new pull request #23263: [SPARK-23674][ML] Adds Spark ML Events URL: https://github.com/apache/spark/pull/23263 ## What changes were proposed in this pull request? This PR proposes to add ML events so that other developers can track and add some actions for them. ## Introduction ML events (like SQL events) can be quite useful when people want to track and make some actions for corresponding ML operations. For instance, I have been working on integrating Apache Spark with [Apache Atlas](https://atlas.apache.org/QuickStart.html). With some custom changes with this PR, I can visualise ML pipeline as below: ![spark_ml_streaming_lineage](https://user-images.githubusercontent.com/6477701/49682779-394bca80-faf5-11e8-85b8-5fae28b784b3.png) Another good thing that might have to be considered is, that we can interact this with other SQL/Streaming events. For instance, where the input `Dataset` is originated. For instance, with current Apache Spark, I can visualise SQL operations as below: ![screen shot 2018-12-10 at 9 41 36 am](https://user-images.githubusercontent.com/6477701/49706269-d9bdfe00-fc5f-11e8-943a-3309d1856ba5.png) I think we can combine those existing lineages together to easily understand where the data comes and goes. Currently, ML side is a hole so the lineages can't be connected for the current Apache Spark .. To add up, I think it's not to mention how useful it is to track the SQL/Streaming operations. Likewise, I would like to propose ML events as well (as lowest stability `@Unstable` APIs for now - no guarantee about stability). ## Implementation Details ### Sends event (but not expose ML specific listener) **`mllib/src/main/scala/org/apache/spark/ml/events.scala`** ```scala @Unstable case class ...StartEvent(caller, input) @Unstable case class ...EndEvent(caller, output) object MLEvents { // Wrappers to send events: // def with...Event(body) = { // body() // SparkContext.getOrCreate().listenerBus.post(event) // } } ``` This way mimics both: **1. Catalog events (see `org/apache/spark/sql/catalyst/catalog/events.scala`)** - This allows a Catalog specific listener to be added `ExternalCatalogEventListener` - It's implemented in a way of wrapping whole `ExternalCatalog` named `ExternalCatalogWithListener` which delegates the operations to `ExternalCatalog` This is not quite possible in this case because most of instances (like `Pipeline`) will be directly created in most of cases. We might be able to do that via extending `ListenerBus` for all possible instances but IMHO it's too invasive. Also, exposing another ML specific listener sounds a bit too much at this stage. Therefore, I simply borrowed file name and structures here **2. SQL execution events (see `org/apache/spark/sql/execution/SQLExecution.scala`)** - Add an object that wraps a body to send events Current apporach is rather close to this. It has a `with...` wrapper to send events. I borrowed this approach to be consistent. ### Add `...Impl` methods to wrap each to send events **`mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala`** ```diff - def save(...) = { saveImpl(...) } + def save(...) = MLEvents.withSaveInstanceEvent { saveImpl(...) } def saveImpl(...): Unit = ... ``` Note that `saveImpl` was already implemented unlike other instances below. ```diff - def load(...): T + def load(...): T = MLEvents.withLoadInstanceEvent { loadImple(...) } + def loadImpl(...): T ``` **`mllib/src/main/scala/org/apache/spark/ml/Estimator.scala`** ```diff - def fit(...): Model + def fit(...): Model = MLEvents.withFitEvent { fitImpl(...) } + def fitImpl(...): Model ``` **`mllib/src/main/scala/org/apache/spark/ml/Transformer.scala`** ```diff - def transform(...): DataFrame + def transform(...): DataFrame = MLEvents.withTransformEvent { transformImpl(...) } + def transformImpl(...): DataFrame ``` This approach follows the existing way as below in ML: **1. `transform` and `transformImpl`** https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala#L202-L213 https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala#L191-L196 https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala#L1037-L1042 **2. `save` and `saveImpl`** https://github.com/apache/spark/blob/9b1f6c8bab5401258c653d4e2efb50e97c6d282f/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L166-L176 Inherited ones are intentionally omitted here for simplicity. They are inherited and implemented at multiple places. ## Usage It needs a custom implementation for a query listener. For instance, with the custom listener below: ```scala class CustomMLListener extends SparkListener def onOtherEvents(e) = e match { case e: MLEvent => // do something case _ => // pass } } ``` There are two (existing) ways to use this. ```scala spark.sparkContext.addSparkListener(new CustomMLListener) ``` ```bash spark-submit ...\ --conf spark.extraListeners=CustomMLListener\ ... ``` It's also similar with other existing implementation in SQL side. ## Target users 1. I think someone in general would likely utilise this feature like other event listeners. At least, I can see some interests going on outside. - SQL Listener - https://stackoverflow.com/questions/46409339/spark-listener-to-an-sql-query - http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-Custom-Query-Execution-listener-via-conf-properties-td30979.html - Streaming Query Listener - https://jhui.github.io/2017/01/15/Apache-Spark-Streaming/ - http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Watermark-td25413.html#a25416 2. Someone would likely run this via Atlas. The plugin mirror intentionally is exposed at [spark-atlas-connector](https://github.com/hortonworks-spark/spark-atlas-connector) so that anyone could do something about lineage and governance in Atlas, yes, as you said. Yes, I'm trying to show integrated lineages in Apache Spark but this is a missing hole. There had to be this change for it. ## Backward Compatibility _This keeps both source and binary backward compatibility_. I was thinking enforcing `...Impl` by leaving it abstract methods to force to implement but just decided to leave a body that throws `UnsupportedOperationException` so that we can keep full source and binary compatibilities. - For user-faced API perspective, _there's no difference_. `...Impl` methods are protected and not visible to end users. - For developer API perspective, if some developers want to `...` methods instead of `...Impl`, that's still fine. It only does not handle events. If developers want to handle events from their custom implementation, they should implement `...Impl`. Of course, it is encouraged to implement `...Impl` For instance, ```scala class Pipeline extends Estimator[PipelineModel] { def fit(dataset: Dataset[_]): PipelineModel = { ... } } ``` still works fine without any behaviour changes. If developers want their pipeline to emit events, they should change: ```diff class Pipeline extends Estimator[PipelineModel] { - def fit(dataset: Dataset[_]): PipelineModel = { + def fitImpl(dataset: Dataset[_]): PipelineModel = { ... } } ``` _^ this is only API change this PR causes._ ## How was this patch tested? Manually tested and unit tests were added. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Spark ML Listener for Tracking ML Pipeline Status > ----------------------------------------------------- > > Key: SPARK-23674 > URL: https://issues.apache.org/jira/browse/SPARK-23674 > Project: Spark > Issue Type: Improvement > Components: ML > Affects Versions: 2.3.0 > Reporter: Mingjie Tang > Priority: Major > > Currently, Spark provides status monitoring for different components of > Spark, like spark history server, streaming listener, sql listener and etc. > The use case would be (1) front UI to track the status of training coverage > rate during iteration, then DS can understand how the job converge when > training, like K-means, Logistic and other linear regression model. (2) > tracking the data lineage for the input and output of training data. > In this proposal, we hope to provide Spark ML pipeline listener to track the > status of Spark ML pipeline status includes: > # ML pipeline create and saved > # ML pipeline model created, saved and load > # ML model training status monitoring -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org