This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cc7f5124057f [SPARK-54229][PYTHON] Make PySparkLogger in UDFs store 
one log entry per log function call
cc7f5124057f is described below

commit cc7f5124057fed783e311ec98322463cdeb8a72c
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Nov 7 12:17:06 2025 -0800

    [SPARK-54229][PYTHON] Make PySparkLogger in UDFs store one log entry per 
log function call
    
    ### What changes were proposed in this pull request?
    
    Makes `PySparkLogger` in UDFs store one log entry per log function call.
    
    ### Why are the changes needed?
    
    Currently if `PySparkLogger` is used in UDFs, it will produce two entries 
per one log function call because it automatically adds a handler that writes 
to `sys.stderr`, which causes two entries.
    
    It doesn't need the additional handler if it's in the `capture_outputs` 
context.
    
    <details>
    <summary>example</summary>
    
    ```python
    >>> from pyspark.sql.functions import *
    >>> from pyspark.logger import PySparkLogger
    >>>
    >>> udf
    ... def pyspark_logger_test_udf(x):
    ...     logger = PySparkLogger.getLogger("test")
    ...     logger.warn(f"WARN level message: {x}", x=x)
    ...     return str(x)
    ...
    >>>
    >>> spark.conf.set("spark.sql.pyspark.worker.logging.enabled", True)
    >>>
    >>> spark.range(1).select(pyspark_logger_test_udf("id")).show()
    ...
    ```
    
    </details>
    
    - before
    
    ```py
    >>> 
spark.table("system.session.python_worker_logs").orderBy("ts").show(truncate=False)
    
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
    |ts                        |level  |msg                                     
                                                                                
    |context                                       |exception|logger|
    
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
    |2025-11-06 18:40:03.658127|WARNING|WARN level message: 0                   
                                                                                
    |{func_name -> pyspark_logger_test_udf, x -> 0}|NULL     |test  |
    |2025-11-06 18:40:03.66424 |ERROR  |{"ts": "2025-11-06 18:40:03.658", 
"level": "WARNING", "logger": "test", "msg": "WARN level message: 0", 
"context": {"x": 0}}|{func_name -> pyspark_logger_test_udf}        |NULL     
|stderr|
    
+--------------------------+-------+----------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+---------+------+
    ```
    
    - after
    
    ```py
    >>> 
spark.table("system.session.python_worker_logs").orderBy("ts").show(truncate=False)
    
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
    |ts                        |level  |msg                  |context           
                            |exception|logger|
    
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
    |2025-11-06 18:41:48.601256|WARNING|WARN level message: 0|{func_name -> 
pyspark_logger_test_udf, x -> 0}|NULL     |test  |
    
+--------------------------+-------+---------------------+----------------------------------------------+---------+------+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, `PySparkLogger` in UDFs will store one log entry per one log function 
call.
    
    ### How was this patch tested?
    
    Added the related tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52926 from ueshin/issues/SPARK-54229/pyspark_logger.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 python/pyspark/logger/logger.py       | 12 ++++++++++
 python/pyspark/sql/tests/test_udf.py  | 30 +++++++++++++++++++++++
 python/pyspark/sql/tests/test_udtf.py | 45 +++++++++++++++++++++++++++++++++++
 3 files changed, 87 insertions(+)

diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py
index b60561f24c99..72179b033bb3 100644
--- a/python/pyspark/logger/logger.py
+++ b/python/pyspark/logger/logger.py
@@ -140,7 +140,19 @@ class PySparkLogger(logging.Logger):
     """
 
     def __init__(self, name: str = "PySparkLogger"):
+        from pyspark.logger.worker_io import JSONFormatterWithMarker
+
         super().__init__(name, level=logging.WARN)
+
+        root_logger = logging.getLogger()
+        if any(
+            isinstance(h, logging.StreamHandler)
+            and isinstance(h.formatter, JSONFormatterWithMarker)
+            for h in root_logger.handlers
+        ):
+            # Likely in the `capture_outputs` context, so don't add a handler
+            return
+
         _handler = logging.StreamHandler()
         self.addHandler(_handler)
 
diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index d6bc8ad28b33..0ceb745c5860 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -48,6 +48,7 @@ from pyspark.sql.types import (
     VariantVal,
 )
 from pyspark.errors import AnalysisException, PythonException, PySparkTypeError
+from pyspark.logger import PySparkLogger
 from pyspark.testing.objects import ExamplePoint, ExamplePointUDT
 from pyspark.testing.sqlutils import (
     ReusedSQLTestCase,
@@ -1657,6 +1658,35 @@ class BaseUDFTestsMixin(object):
             ],
         )
 
+    @unittest.skipIf(is_remote_only(), "Requires JVM access")
+    def test_udf_with_pyspark_logger(self):
+        @udf
+        def my_udf(x):
+            logger = PySparkLogger.getLogger("PySparkLogger")
+            logger.warning("PySparkLogger test", x=x)
+            return str(x)
+
+        with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled": 
"true"}):
+            assertDataFrameEqual(
+                self.spark.range(2).select(my_udf("id").alias("result")),
+                [Row(result=str(i)) for i in range(2)],
+            )
+
+        logs = self.spark.table("system.session.python_worker_logs")
+
+        assertDataFrameEqual(
+            logs.select("level", "msg", "context", "logger"),
+            [
+                Row(
+                    level="WARNING",
+                    msg="PySparkLogger test",
+                    context={"func_name": my_udf.__name__, "x": str(i)},
+                    logger="PySparkLogger",
+                )
+                for i in range(2)
+            ],
+        )
+
 
 class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase):
     @classmethod
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index b86a2624acd5..70623a5f62d4 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -65,6 +65,7 @@ from pyspark.sql.types import (
     StructType,
     VariantVal,
 )
+from pyspark.logger import PySparkLogger
 from pyspark.testing import assertDataFrameEqual, assertSchemaEqual
 from pyspark.testing.objects import ExamplePoint, ExamplePointUDT
 from pyspark.testing.sqlutils import (
@@ -3135,6 +3136,50 @@ class BaseUDTFTestsMixin:
             ],
         )
 
+    @unittest.skipIf(is_remote_only(), "Requires JVM access")
+    def test_udtf_analyze_with_pyspark_logger(self):
+        @udtf
+        class TestUDTFWithLogging:
+            @staticmethod
+            def analyze(x: AnalyzeArgument) -> AnalyzeResult:
+                logger = PySparkLogger.getLogger("PySparkLogger")
+                logger.warning(f"udtf analyze: {x.dataType.json()}", 
dt=x.dataType.json())
+                return AnalyzeResult(StructType().add("a", 
IntegerType()).add("b", IntegerType()))
+
+            def eval(self, x: int):
+                yield x * 2, x + 10
+
+        with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled": 
"true"}):
+            assertDataFrameEqual(
+                self.spark.createDataFrame([(5,), (10,)], ["x"]).lateralJoin(
+                    TestUDTFWithLogging(col("x").outer())
+                ),
+                [Row(x=x, a=x * 2, b=x + 10) for x in [5, 10]],
+            )
+
+        logs = self.spark.table("system.session.python_worker_logs")
+
+        assertDataFrameEqual(
+            logs.select(
+                "level",
+                "msg",
+                col("context.class_name").alias("context_class_name"),
+                col("context.func_name").alias("context_func_name"),
+                col("context.dt").alias("context_dt"),
+                "logger",
+            ).distinct(),
+            [
+                Row(
+                    level="WARNING",
+                    msg='udtf analyze: "long"',
+                    context_class_name="TestUDTFWithLogging",
+                    context_func_name="analyze",
+                    context_dt='"long"',
+                    logger="PySparkLogger",
+                )
+            ],
+        )
+
 
 class UDTFTests(BaseUDTFTestsMixin, ReusedSQLTestCase):
     @classmethod


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to