This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d948be6c963 [SPARK-45677] Better error logging for Observation API
d948be6c963 is described below
commit d948be6c963d9d41ed1e1b8446e99c2f421b7f50
Author: Wei Liu <[email protected]>
AuthorDate: Fri Oct 27 09:44:59 2023 +0900
[SPARK-45677] Better error logging for Observation API
### What changes were proposed in this pull request?
The simplified API for observed metrics doesn't support streaming
dataframes. We should tell user why it's not supported and what to do
### Why are the changes needed?
To make spark more user-friendly
### Does this PR introduce _any_ user-facing change?
Just a better error logging
### How was this patch tested?
Don't need
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43542 from WweiL/SPARK-45677-observe-error.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
sql/core/src/main/scala/org/apache/spark/sql/Observation.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
index 14c4983794b..cb6fbfbb2ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
@@ -73,7 +73,10 @@ class Observation(val name: String) {
*/
private[spark] def on[T](ds: Dataset[T], expr: Column, exprs: Column*):
Dataset[T] = {
if (ds.isStreaming) {
- throw new IllegalArgumentException("Observation does not support
streaming Datasets")
+ throw new IllegalArgumentException("Observation does not support
streaming Datasets." +
+ "This is because there will be multiple observed metrics as
microbatches are constructed" +
+ ". Please register a StreamingQueryListener and get the metric for
each microbatch in " +
+ "QueryProgressEvent.progress, or use query.lastProgress or
query.recentProgress.")
}
register(ds)
ds.observe(name, expr, exprs: _*)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]