This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cc7f5124057f [SPARK-54229][PYTHON] Make PySparkLogger in UDFs store
one log entry per log function call
cc7f5124057f is described below
commit cc7f5124057fed783e311ec98322463cdeb8a72c
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Nov 7 12:17:06 2025 -0800
[SPARK-54229][PYTHON] Make PySparkLogger in UDFs store one log entry per
log function call
### What changes were proposed in this pull request?
Makes `PySparkLogger` in UDFs store one log entry per log function call.
### Why are the changes needed?
Currently if `PySparkLogger` is used in UDFs, it will produce two entries
per one log function call because it automatically adds a handler that writes
to `sys.stderr`, which causes two entries.
It doesn't need the additional handler if it's in the `capture_outputs`
context.
<details>
<summary>example</summary>
```python
>>> from pyspark.sql.functions import *
>>> from pyspark.logger import PySparkLogger
>>>
>>> udf
... def pyspark_logger_test_udf(x):
... logger = PySparkLogger.getLogger("test")
... logger.warn(f"WARN level message: {x}", x=x)
... return str(x)
...
>>>
>>> spark.conf.set("spark.sql.pyspark.worker.logging.enabled", True)
>>>
>>> spark.range(1).select(pyspark_logger_test_udf("id")).show()
...
```
</details>
- before
```py
>>>
spark.table("system.session.python_worker_logs").orderBy("ts").show(truncate=False)
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
|ts |level |msg
|context |exception|logger|
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
|2025-11-06 18:40:03.658127|WARNING|WARN level message: 0
|{func_name -> pyspark_logger_test_udf, x -> 0}|NULL |test |
|2025-11-06 18:40:03.66424 |ERROR |{"ts": "2025-11-06 18:40:03.658",
"level": "WARNING", "logger": "test", "msg": "WARN level message: 0",
"context": {"x": 0}}|{func_name -> pyspark_logger_test_udf} |NULL
|stderr|
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
```
- after
```py
>>>
spark.table("system.session.python_worker_logs").orderBy("ts").show(truncate=False)
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
|ts |level |msg |context
|exception|logger|
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
|2025-11-06 18:41:48.601256|WARNING|WARN level message: 0|{func_name ->
pyspark_logger_test_udf, x -> 0}|NULL |test |
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
```
### Does this PR introduce _any_ user-facing change?
Yes, `PySparkLogger` in UDFs will store one log entry per one log function
call.
### How was this patch tested?
Added the related tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52926 from ueshin/issues/SPARK-54229/pyspark_logger.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
python/pyspark/logger/logger.py | 12 ++++++++++
python/pyspark/sql/tests/test_udf.py | 30 +++++++++++++++++++++++
python/pyspark/sql/tests/test_udtf.py | 45 +++++++++++++++++++++++++++++++++++
3 files changed, 87 insertions(+)
diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py
index b60561f24c99..72179b033bb3 100644
--- a/python/pyspark/logger/logger.py
+++ b/python/pyspark/logger/logger.py
@@ -140,7 +140,19 @@ class PySparkLogger(logging.Logger):
"""
def __init__(self, name: str = "PySparkLogger"):
+ from pyspark.logger.worker_io import JSONFormatterWithMarker
+
super().__init__(name, level=logging.WARN)
+
+ root_logger = logging.getLogger()
+ if any(
+ isinstance(h, logging.StreamHandler)
+ and isinstance(h.formatter, JSONFormatterWithMarker)
+ for h in root_logger.handlers
+ ):
+ # Likely in the `capture_outputs` context, so don't add a handler
+ return
+
_handler = logging.StreamHandler()
self.addHandler(_handler)
diff --git a/python/pyspark/sql/tests/test_udf.py
b/python/pyspark/sql/tests/test_udf.py
index d6bc8ad28b33..0ceb745c5860 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -48,6 +48,7 @@ from pyspark.sql.types import (
VariantVal,
)
from pyspark.errors import AnalysisException, PythonException, PySparkTypeError
+from pyspark.logger import PySparkLogger
from pyspark.testing.objects import ExamplePoint, ExamplePointUDT
from pyspark.testing.sqlutils import (
ReusedSQLTestCase,
@@ -1657,6 +1658,35 @@ class BaseUDFTestsMixin(object):
],
)
+ @unittest.skipIf(is_remote_only(), "Requires JVM access")
+ def test_udf_with_pyspark_logger(self):
+ @udf
+ def my_udf(x):
+ logger = PySparkLogger.getLogger("PySparkLogger")
+ logger.warning("PySparkLogger test", x=x)
+ return str(x)
+
+ with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled":
"true"}):
+ assertDataFrameEqual(
+ self.spark.range(2).select(my_udf("id").alias("result")),
+ [Row(result=str(i)) for i in range(2)],
+ )
+
+ logs = self.spark.table("system.session.python_worker_logs")
+
+ assertDataFrameEqual(
+ logs.select("level", "msg", "context", "logger"),
+ [
+ Row(
+ level="WARNING",
+ msg="PySparkLogger test",
+ context={"func_name": my_udf.__name__, "x": str(i)},
+ logger="PySparkLogger",
+ )
+ for i in range(2)
+ ],
+ )
+
class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
@classmethod
diff --git a/python/pyspark/sql/tests/test_udtf.py
b/python/pyspark/sql/tests/test_udtf.py
index b86a2624acd5..70623a5f62d4 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -65,6 +65,7 @@ from pyspark.sql.types import (
StructType,
VariantVal,
)
+from pyspark.logger import PySparkLogger
from pyspark.testing import assertDataFrameEqual, assertSchemaEqual
from pyspark.testing.objects import ExamplePoint, ExamplePointUDT
from pyspark.testing.sqlutils import (
@@ -3135,6 +3136,50 @@ class BaseUDTFTestsMixin:
],
)
+ @unittest.skipIf(is_remote_only(), "Requires JVM access")
+ def test_udtf_analyze_with_pyspark_logger(self):
+ @udtf
+ class TestUDTFWithLogging:
+ @staticmethod
+ def analyze(x: AnalyzeArgument) -> AnalyzeResult:
+ logger = PySparkLogger.getLogger("PySparkLogger")
+ logger.warning(f"udtf analyze: {x.dataType.json()}",
dt=x.dataType.json())
+ return AnalyzeResult(StructType().add("a",
IntegerType()).add("b", IntegerType()))
+
+ def eval(self, x: int):
+ yield x * 2, x + 10
+
+ with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled":
"true"}):
+ assertDataFrameEqual(
+ self.spark.createDataFrame([(5,), (10,)], ["x"]).lateralJoin(
+ TestUDTFWithLogging(col("x").outer())
+ ),
+ [Row(x=x, a=x * 2, b=x + 10) for x in [5, 10]],
+ )
+
+ logs = self.spark.table("system.session.python_worker_logs")
+
+ assertDataFrameEqual(
+ logs.select(
+ "level",
+ "msg",
+ col("context.class_name").alias("context_class_name"),
+ col("context.func_name").alias("context_func_name"),
+ col("context.dt").alias("context_dt"),
+ "logger",
+ ).distinct(),
+ [
+ Row(
+ level="WARNING",
+ msg='udtf analyze: "long"',
+ context_class_name="TestUDTFWithLogging",
+ context_func_name="analyze",
+ context_dt='"long"',
+ logger="PySparkLogger",
+ )
+ ],
+ )
+
class UDTFTests(BaseUDTFTestsMixin, ReusedSQLTestCase):
@classmethod
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]