This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a68c2e6ebe35 [SPARK-54246][PYTHON][DOCS] Add the user guide for python 
worker logging
a68c2e6ebe35 is described below

commit a68c2e6ebe35fb69da6b57e6d75130c875766ed6
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Nov 7 20:42:39 2025 -0800

    [SPARK-54246][PYTHON][DOCS] Add the user guide for python worker logging
    
    ### What changes were proposed in this pull request?
    
    Adds the user guide for python worker logging.
    
    ### Why are the changes needed?
    
    The documentation for python worker logging will be in the user guide: 
"Chapter 4: Bug Busting - Debugging PySpark".
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, asked cursor for the draft.
    
    Closes #52949 from ueshin/issues/SPARK-54246/user_guide.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit b144965891495619302ae23633af14d252b90b8f)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 python/docs/source/user_guide/bugbusting.ipynb | 359 ++++++++++++++++++++++++-
 1 file changed, 358 insertions(+), 1 deletion(-)

diff --git a/python/docs/source/user_guide/bugbusting.ipynb 
b/python/docs/source/user_guide/bugbusting.ipynb
index 8e64bda1175c..75150596600e 100644
--- a/python/docs/source/user_guide/bugbusting.ipynb
+++ b/python/docs/source/user_guide/bugbusting.ipynb
@@ -792,7 +792,7 @@
    "id": "09b420ba",
    "metadata": {},
    "source": [
-    "## Disply Stacktraces"
+    "## Display Stacktraces"
    ]
   },
   {
@@ -900,6 +900,363 @@
     "See also [Stack 
Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces)
 for more details."
    ]
   },
+  {
+   "cell_type": "markdown",
+   "id": "cff22ba8",
+   "metadata": {},
+   "source": [
+    "## Python Worker Logging\n",
+    "\n",
+    "<div class=\"alert alert-block alert-info\">\n",
+    "<b>Note:</b> This section applies to Spark 4.1\n",
+    "</div>\n",
+    "\n",
+    "PySpark provides a logging mechanism for Python workers that execute 
UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging 
output (including `print` statements, standard logging, and exceptions) is 
captured and made available for querying and analysis.\n",
+    "\n",
+    "### Enabling Worker Logging\n",
+    "\n",
+    "Worker logging is **disabled by default**. Enable it by setting the Spark 
SQL configuration:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 20,
+   "id": "74786d45",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0f23fee2",
+   "metadata": {},
+   "source": [
+    "### Accessing Logs\n",
+    "\n",
+    "All captured logs can be queried as a DataFrame:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 21,
+   "id": "9db0c509",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "logs = spark.table(\"system.session.python_worker_logs\")"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "34bca836",
+   "metadata": {},
+   "source": [
+    "The logs DataFrame contains the following columns:\n",
+    "\n",
+    "- **ts**: Timestamp of the log entry\n",
+    "- **level**: Log level (e.g., `\"INFO\"`, `\"WARNING\"`, `\"ERROR\"`)\n",
+    "- **logger**: Logger name (e.g., custom logger name, `\"stdout\"`, 
`\"stderr\"`)\n",
+    "- **msg**: The log message\n",
+    "- **context**: A map containing contextual information (e.g., 
`func_name`, `class_name`, custom fields)\n",
+    "- **exception**: Exception details (if an exception was logged)\n",
+    "\n",
+    "### Examples\n",
+    "\n",
+    "#### Basic UDF Logging"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 22,
+   "id": "4cb5bbca",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+------------+\n",
+      "|my_udf(text)|\n",
+      "+------------+\n",
+      "|       HELLO|\n",
+      "|       WORLD|\n",
+      "+------------+\n",
+      "\n",
+      
"+-------+------------------------+----------------+---------------------+\n",
+      "|level  |msg                     |logger          |context              
|\n",
+      
"+-------+------------------------+----------------+---------------------+\n",
+      "|INFO   |Processing value: hello |my_custom_logger|{func_name -> 
my_udf}|\n",
+      "|WARNING|This is a warning       |my_custom_logger|{func_name -> 
my_udf}|\n",
+      "|INFO   |This is a stdout message|stdout          |{func_name -> 
my_udf}|\n",
+      "|ERROR  |This is a stderr message|stderr          |{func_name -> 
my_udf}|\n",
+      "|INFO   |Processing value: world |my_custom_logger|{func_name -> 
my_udf}|\n",
+      "|WARNING|This is a warning       |my_custom_logger|{func_name -> 
my_udf}|\n",
+      "|INFO   |This is a stdout message|stdout          |{func_name -> 
my_udf}|\n",
+      "|ERROR  |This is a stderr message|stderr          |{func_name -> 
my_udf}|\n",
+      
"+-------+------------------------+----------------+---------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "from pyspark.sql.functions import udf\n",
+    "import logging\n",
+    "import sys\n",
+    "\n",
+    "@udf(\"string\")\n",
+    "def my_udf(value):\n",
+    "    logger = logging.getLogger(\"my_custom_logger\")\n",
+    "    logger.setLevel(logging.INFO)  # Set level to INFO to capture info 
messages\n",
+    "    logger.info(f\"Processing value: {value}\")\n",
+    "    logger.warning(\"This is a warning\")\n",
+    "    print(\"This is a stdout message\")  # INFO level, logger=stdout\n",
+    "    print(\"This is a stderr message\", file=sys.stderr)  # ERROR level, 
logger=stderr\n",
+    "    return value.upper()\n",
+    "\n",
+    "# Enable logging and execute\n",
+    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+    "df = spark.createDataFrame([(\"hello\",), (\"world\",)], [\"text\"])\n",
+    "df.select(my_udf(\"text\")).show()\n",
+    "\n",
+    "# Query the logs\n",
+    "logs = spark.table(\"system.session.python_worker_logs\")\n",
+    "logs.select(\"level\", \"msg\", \"logger\", 
\"context\").show(truncate=False)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "15a80ffb",
+   "metadata": {},
+   "source": [
+    "#### Logging with Custom Context\n",
+    "\n",
+    "You can add custom context information to your logs:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 23,
+   "id": "427a06c5",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|contextual_udf(test)|\n",
+      "+--------------------+\n",
+      "|                test|\n",
+      "+--------------------+\n",
+      "\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "|msg                          |context                                  
                            |\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "|Processing with extra context|{func_name -> contextual_udf, user_id -> 
123, operation -> transform}|\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "from pyspark.sql.functions import lit, udf\n",
+    "import logging\n",
+    "\n",
+    "@udf(\"string\")\n",
+    "def contextual_udf(value):\n",
+    "    logger = logging.getLogger(\"contextual\")\n",
+    "    logger.warning(\n",
+    "        \"Processing with extra context\",\n",
+    "        extra={\"context\": {\"user_id\": 123, \"operation\": 
\"transform\"}}\n",
+    "    )\n",
+    "    return value\n",
+    "\n",
+    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+    "spark.range(1).select(contextual_udf(lit(\"test\"))).show()\n",
+    "\n",
+    "logs = spark.table(\"system.session.python_worker_logs\")\n",
+    "logs.filter(\"logger = 'contextual'\").select(\"msg\", 
\"context\").show(truncate=False)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "a19db296",
+   "metadata": {},
+   "source": [
+    "The context includes both automatic fields (like `func_name`) and custom 
fields (like `user_id`, `operation`).\n",
+    "\n",
+    "#### Exception Logging\n",
+    "\n",
+    "Exceptions are automatically captured with full stack traces:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 24,
+   "id": "3ab34a4c",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+------------------+\n",
+      "|failing_udf(value)|\n",
+      "+------------------+\n",
+      "|                -1|\n",
+      "|                20|\n",
+      "+------------------+\n",
+      "\n",
+      
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+      "|msg                      |exception                                    
                                                                                
                 |\n",
+      
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+      "|Division by zero occurred|{ZeroDivisionError, division by zero, 
[{NULL, failing_udf, 
/var/folders/r8/0v7zwfbd59q4ym2gn6kxjq8h0000gp/T/ipykernel_79089/916837455.py, 
8}]}|\n",
+      
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "from pyspark.sql.functions import udf\n",
+    "import logging\n",
+    "\n",
+    "@udf(\"int\")\n",
+    "def failing_udf(x):\n",
+    "    logger = logging.getLogger(\"error_handler\")\n",
+    "    try:\n",
+    "        result = 100 / x\n",
+    "    except ZeroDivisionError:\n",
+    "        logger.exception(\"Division by zero occurred\")\n",
+    "        return -1\n",
+    "    return int(result)\n",
+    "\n",
+    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+    "spark.createDataFrame([(0,), (5,)], 
[\"value\"]).select(failing_udf(\"value\")).show()\n",
+    "\n",
+    "logs = spark.table(\"system.session.python_worker_logs\")\n",
+    "logs.filter(\"logger = 'error_handler'\").select(\"msg\", 
\"exception\").show(truncate=False)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "e54f6ac3",
+   "metadata": {},
+   "source": [
+    "#### UDTF and Python Data Source Logging\n",
+    "\n",
+    "Worker logging also works with UDTFs and Python Data Sources, capturing 
both the class and function names:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 25,
+   "id": "02d454b0",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+-----------+-----+------+\n",
+      "|       text| word|length|\n",
+      "+-----------+-----+------+\n",
+      "|hello world|hello|     5|\n",
+      "|hello world|world|     5|\n",
+      "+-----------+-----+------+\n",
+      "\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "|msg                          |context                                  
                            |\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "|Processing 2 words           |{func_name -> eval, class_name -> 
WordSplitter}                      |\n",
+      
"+-----------------------------+---------------------------------------------------------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "from pyspark.sql.functions import col, udtf\n",
+    "import logging\n",
+    "\n",
+    "@udtf(returnType=\"word: string, length: int\")\n",
+    "class WordSplitter:\n",
+    "    def eval(self, text: str):\n",
+    "        logger = logging.getLogger(\"udtf_logger\")\n",
+    "        logger.setLevel(logging.INFO)  # Set level to INFO to capture 
info messages\n",
+    "        words = text.split()\n",
+    "        logger.info(f\"Processing {len(words)} words\")\n",
+    "        for word in words:\n",
+    "            yield (word, len(word))\n",
+    "\n",
+    "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+    "df = spark.createDataFrame([(\"hello world\",)], [\"text\"])\n",
+    "df.lateralJoin(WordSplitter(col(\"text\").outer())).show()\n",
+    "\n",
+    "logs = spark.table(\"system.session.python_worker_logs\")\n",
+    "logs.filter(\"logger = 'udtf_logger'\").select(\"msg\", 
\"context\").show(truncate=False)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "9d4c119b",
+   "metadata": {},
+   "source": [
+    "### Querying and Analyzing Logs\n",
+    "\n",
+    "You can use standard DataFrame operations to analyze logs:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 26,
+   "id": "5b061011",
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+-------+-----+\n",
+      "|  level|count|\n",
+      "+-------+-----+\n",
+      "|   INFO|    5|\n",
+      "|WARNING|    3|\n",
+      "|  ERROR|    3|\n",
+      "+-------+-----+\n",
+      "\n",
+      "...\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "logs = spark.table(\"system.session.python_worker_logs\")\n",
+    "\n",
+    "# Count logs by level\n",
+    "logs.groupBy(\"level\").count().show()\n",
+    "\n",
+    "# Find all errors\n",
+    "logs.filter(\"level = 'ERROR'\").show()\n",
+    "\n",
+    "# Logs from a specific function\n",
+    "logs.filter(\"context.func_name = 'my_udf'\").show()\n",
+    "\n",
+    "# Logs with exceptions\n",
+    "logs.filter(\"exception is not null\").show()\n",
+    "\n",
+    "# Time-based analysis\n",
+    "logs.orderBy(\"ts\").show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "7eaa72b9",
+   "metadata": {},
+   "source": [
+    "\n"
+   ]
+  },
   {
    "attachments": {},
    "cell_type": "markdown",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to