I am focusing on batch mode, not streaming mode. I would argue that
Dataset.observe() is equally useful for large batch processing. If you
need some motivating use cases, please let me know.
Anyhow, the documentation of observe states it works for both, batch and
streaming. And in batch mode, the helper class Observation helps
reducing code and avoiding repetition.
The PySpark implementation of the Observation class can implement *all*
methods by merely calling into their JVM counterpart, where the locking,
listening, registration and un-registration happens. I think this
qualifies as: "all the logic happens in the JVM". All that is
transferred to Python is a row's data. No listeners needed.
Enrico
Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
If I remember correctly, the major audience of the "observe" API is
Structured Streaming, micro-batch mode. From the example, the
abstraction in 2 isn't something working with Structured Streaming. It
could be still done with callback, but it remains the question how
much complexity is hidden from abstraction.
I see you're focusing on PySpark - I'm not sure whether there's
intention on not exposing query listener / streaming query listener to
PySpark, but if there's some valid reason to do so, I'm not sure we do
like to expose them to PySpark in any way. 2 isn't making sense to me
with PySpark - how do you ensure all the logic is happening in the JVM
and you can leverage these values from PySpark?
(I see there's support for listeners with DStream in PySpark, so there
might be reasons not to add the same for SQL/SS. Probably a lesson
learned?)
On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack <m...@enrico.minack.dev
<mailto:m...@enrico.minack.dev>> wrote:
Hi Spark-Devs,
the observable metrics that have been added to the Dataset API in
3.0.0 are a great improvement over the Accumulator APIs that seem
to have much weaker guarantees. I have two questions regarding
follow-up contributions:
*1. Add observe to Python ***DataFrame**
As I can see from master branch, there is no equivalent in the
Python API. Is this something planned to happen, or is it missing
because there are not listeners in PySpark which renders this
method useless in Python. I would be happy to contribute here.
*2. Add Observation class to simplify result access
*
The Dataset.observe method requires users to register listeners
<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
with QueryExecutionListener or StreamingQUeryListener to obtain
the result. I think for simple setups, this could be hidden behind
a common helper class here called Observation, which reduces the
usage of observe to three lines of code:
// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8,
"d")).toDF("id", "value")
// define the observation we want to make val observation =Observation("stats",
count($"id"), sum($"id"))
// add the observation to the Dataset and execute an action on it
val cnt = df.observe(observation).count()
// retrieve the result assert(observation.get ===Row(4, 15))
The Observation class can handle the registration and
de-registration of the listener, as well as properly accessing the
result across thread boundaries.
With *2.*, the observe method can be added to PySpark without
introducing listeners there at all. All the logic is happening in
the JVM.
Thanks for your thoughts on this.
Regards,
Enrico