Please follow up the discussion in the origin PR. https://github.com/apache/spark/pull/26127
Dataset.observe() relies on the query listener for the batch query which is an "unstable" API - that's why we decided to not add an example for the batch query. For streaming query, it relies on the streaming query listener which is a stable API. That said, personally I'd consider the new API to be fit to the streaming query first, and see whether it fits to the batch query as well. If we found Dataset.observe() to be useful enough on the batch query, we'd probably be better to discuss how to provide these metrics against a stable API (so that Scala users could leverage it), and look back later for PySpark. That looks to be the first one to do if we have a consensus on the usefulness of observable metrics on batch query. On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack <m...@enrico.minack.dev> wrote: > I am focusing on batch mode, not streaming mode. I would argue that > Dataset.observe() is equally useful for large batch processing. If you > need some motivating use cases, please let me know. > > Anyhow, the documentation of observe states it works for both, batch and > streaming. And in batch mode, the helper class Observation helps reducing > code and avoiding repetition. > > The PySpark implementation of the Observation class can implement *all* > methods by merely calling into their JVM counterpart, where the locking, > listening, registration and un-registration happens. I think this qualifies > as: "all the logic happens in the JVM". All that is transferred to Python > is a row's data. No listeners needed. > > Enrico > > > > Am 16.03.21 um 00:13 schrieb Jungtaek Lim: > > If I remember correctly, the major audience of the "observe" API is > Structured Streaming, micro-batch mode. From the example, the abstraction > in 2 isn't something working with Structured Streaming. It could be still > done with callback, but it remains the question how much complexity is > hidden from abstraction. > > I see you're focusing on PySpark - I'm not sure whether there's intention > on not exposing query listener / streaming query listener to PySpark, but > if there's some valid reason to do so, I'm not sure we do like to expose > them to PySpark in any way. 2 isn't making sense to me with PySpark - how > do you ensure all the logic is happening in the JVM and you can leverage > these values from PySpark? > (I see there's support for listeners with DStream in PySpark, so there > might be reasons not to add the same for SQL/SS. Probably a lesson learned?) > > > On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack <m...@enrico.minack.dev> > wrote: > >> Hi Spark-Devs, >> >> the observable metrics that have been added to the Dataset API in 3.0.0 >> are a great improvement over the Accumulator APIs that seem to have much >> weaker guarantees. I have two questions regarding follow-up contributions: >> >> *1. Add observe to Python **DataFrame* >> >> As I can see from master branch, there is no equivalent in the Python >> API. Is this something planned to happen, or is it missing because there >> are not listeners in PySpark which renders this method useless in Python. I >> would be happy to contribute here. >> >> >> *2. Add Observation class to simplify result access * >> >> The Dataset.observe method requires users to register listeners >> <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]> >> with QueryExecutionListener or StreamingQUeryListener to obtain the >> result. I think for simple setups, this could be hidden behind a common >> helper class here called Observation, which reduces the usage of observe >> to three lines of code: >> >> // our Dataset (this does not count as a line of code)val df = Seq((1, "a"), >> (2, "b"), (4, "c"), (8, "d")).toDF("id", "value") >> // define the observation we want to makeval observation = >> Observation("stats", count($"id"), sum($"id")) >> // add the observation to the Dataset and execute an action on itval cnt = >> df.observe(observation).count() >> // retrieve the resultassert(observation.get === Row(4, 15)) >> >> The Observation class can handle the registration and de-registration of >> the listener, as well as properly accessing the result across thread >> boundaries. >> >> With *2.*, the observe method can be added to PySpark without >> introducing listeners there at all. All the logic is happening in the JVM. >> >> Thanks for your thoughts on this. >> >> Regards, >> Enrico >> >