This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a68c2e6ebe35 [SPARK-54246][PYTHON][DOCS] Add the user guide for python
worker logging
a68c2e6ebe35 is described below
commit a68c2e6ebe35fb69da6b57e6d75130c875766ed6
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Nov 7 20:42:39 2025 -0800
[SPARK-54246][PYTHON][DOCS] Add the user guide for python worker logging
### What changes were proposed in this pull request?
Adds the user guide for python worker logging.
### Why are the changes needed?
The documentation for python worker logging will be in the user guide:
"Chapter 4: Bug Busting - Debugging PySpark".
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
Yes, asked cursor for the draft.
Closes #52949 from ueshin/issues/SPARK-54246/user_guide.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b144965891495619302ae23633af14d252b90b8f)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/docs/source/user_guide/bugbusting.ipynb | 359 ++++++++++++++++++++++++-
1 file changed, 358 insertions(+), 1 deletion(-)
diff --git a/python/docs/source/user_guide/bugbusting.ipynb
b/python/docs/source/user_guide/bugbusting.ipynb
index 8e64bda1175c..75150596600e 100644
--- a/python/docs/source/user_guide/bugbusting.ipynb
+++ b/python/docs/source/user_guide/bugbusting.ipynb
@@ -792,7 +792,7 @@
"id": "09b420ba",
"metadata": {},
"source": [
- "## Disply Stacktraces"
+ "## Display Stacktraces"
]
},
{
@@ -900,6 +900,363 @@
"See also [Stack
Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces)
for more details."
]
},
+ {
+ "cell_type": "markdown",
+ "id": "cff22ba8",
+ "metadata": {},
+ "source": [
+ "## Python Worker Logging\n",
+ "\n",
+ "<div class=\"alert alert-block alert-info\">\n",
+ "<b>Note:</b> This section applies to Spark 4.1\n",
+ "</div>\n",
+ "\n",
+ "PySpark provides a logging mechanism for Python workers that execute
UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging
output (including `print` statements, standard logging, and exceptions) is
captured and made available for querying and analysis.\n",
+ "\n",
+ "### Enabling Worker Logging\n",
+ "\n",
+ "Worker logging is **disabled by default**. Enable it by setting the Spark
SQL configuration:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "id": "74786d45",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0f23fee2",
+ "metadata": {},
+ "source": [
+ "### Accessing Logs\n",
+ "\n",
+ "All captured logs can be queried as a DataFrame:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "id": "9db0c509",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "logs = spark.table(\"system.session.python_worker_logs\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "34bca836",
+ "metadata": {},
+ "source": [
+ "The logs DataFrame contains the following columns:\n",
+ "\n",
+ "- **ts**: Timestamp of the log entry\n",
+ "- **level**: Log level (e.g., `\"INFO\"`, `\"WARNING\"`, `\"ERROR\"`)\n",
+ "- **logger**: Logger name (e.g., custom logger name, `\"stdout\"`,
`\"stderr\"`)\n",
+ "- **msg**: The log message\n",
+ "- **context**: A map containing contextual information (e.g.,
`func_name`, `class_name`, custom fields)\n",
+ "- **exception**: Exception details (if an exception was logged)\n",
+ "\n",
+ "### Examples\n",
+ "\n",
+ "#### Basic UDF Logging"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "id": "4cb5bbca",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------+\n",
+ "|my_udf(text)|\n",
+ "+------------+\n",
+ "| HELLO|\n",
+ "| WORLD|\n",
+ "+------------+\n",
+ "\n",
+
"+-------+------------------------+----------------+---------------------+\n",
+ "|level |msg |logger |context
|\n",
+
"+-------+------------------------+----------------+---------------------+\n",
+ "|INFO |Processing value: hello |my_custom_logger|{func_name ->
my_udf}|\n",
+ "|WARNING|This is a warning |my_custom_logger|{func_name ->
my_udf}|\n",
+ "|INFO |This is a stdout message|stdout |{func_name ->
my_udf}|\n",
+ "|ERROR |This is a stderr message|stderr |{func_name ->
my_udf}|\n",
+ "|INFO |Processing value: world |my_custom_logger|{func_name ->
my_udf}|\n",
+ "|WARNING|This is a warning |my_custom_logger|{func_name ->
my_udf}|\n",
+ "|INFO |This is a stdout message|stdout |{func_name ->
my_udf}|\n",
+ "|ERROR |This is a stderr message|stderr |{func_name ->
my_udf}|\n",
+
"+-------+------------------------+----------------+---------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql.functions import udf\n",
+ "import logging\n",
+ "import sys\n",
+ "\n",
+ "@udf(\"string\")\n",
+ "def my_udf(value):\n",
+ " logger = logging.getLogger(\"my_custom_logger\")\n",
+ " logger.setLevel(logging.INFO) # Set level to INFO to capture info
messages\n",
+ " logger.info(f\"Processing value: {value}\")\n",
+ " logger.warning(\"This is a warning\")\n",
+ " print(\"This is a stdout message\") # INFO level, logger=stdout\n",
+ " print(\"This is a stderr message\", file=sys.stderr) # ERROR level,
logger=stderr\n",
+ " return value.upper()\n",
+ "\n",
+ "# Enable logging and execute\n",
+ "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+ "df = spark.createDataFrame([(\"hello\",), (\"world\",)], [\"text\"])\n",
+ "df.select(my_udf(\"text\")).show()\n",
+ "\n",
+ "# Query the logs\n",
+ "logs = spark.table(\"system.session.python_worker_logs\")\n",
+ "logs.select(\"level\", \"msg\", \"logger\",
\"context\").show(truncate=False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "15a80ffb",
+ "metadata": {},
+ "source": [
+ "#### Logging with Custom Context\n",
+ "\n",
+ "You can add custom context information to your logs:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "id": "427a06c5",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "|contextual_udf(test)|\n",
+ "+--------------------+\n",
+ "| test|\n",
+ "+--------------------+\n",
+ "\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "|msg |context
|\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "|Processing with extra context|{func_name -> contextual_udf, user_id ->
123, operation -> transform}|\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql.functions import lit, udf\n",
+ "import logging\n",
+ "\n",
+ "@udf(\"string\")\n",
+ "def contextual_udf(value):\n",
+ " logger = logging.getLogger(\"contextual\")\n",
+ " logger.warning(\n",
+ " \"Processing with extra context\",\n",
+ " extra={\"context\": {\"user_id\": 123, \"operation\":
\"transform\"}}\n",
+ " )\n",
+ " return value\n",
+ "\n",
+ "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+ "spark.range(1).select(contextual_udf(lit(\"test\"))).show()\n",
+ "\n",
+ "logs = spark.table(\"system.session.python_worker_logs\")\n",
+ "logs.filter(\"logger = 'contextual'\").select(\"msg\",
\"context\").show(truncate=False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a19db296",
+ "metadata": {},
+ "source": [
+ "The context includes both automatic fields (like `func_name`) and custom
fields (like `user_id`, `operation`).\n",
+ "\n",
+ "#### Exception Logging\n",
+ "\n",
+ "Exceptions are automatically captured with full stack traces:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "id": "3ab34a4c",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------+\n",
+ "|failing_udf(value)|\n",
+ "+------------------+\n",
+ "| -1|\n",
+ "| 20|\n",
+ "+------------------+\n",
+ "\n",
+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+ "|msg |exception
|\n",
+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+ "|Division by zero occurred|{ZeroDivisionError, division by zero,
[{NULL, failing_udf,
/var/folders/r8/0v7zwfbd59q4ym2gn6kxjq8h0000gp/T/ipykernel_79089/916837455.py,
8}]}|\n",
+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql.functions import udf\n",
+ "import logging\n",
+ "\n",
+ "@udf(\"int\")\n",
+ "def failing_udf(x):\n",
+ " logger = logging.getLogger(\"error_handler\")\n",
+ " try:\n",
+ " result = 100 / x\n",
+ " except ZeroDivisionError:\n",
+ " logger.exception(\"Division by zero occurred\")\n",
+ " return -1\n",
+ " return int(result)\n",
+ "\n",
+ "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+ "spark.createDataFrame([(0,), (5,)],
[\"value\"]).select(failing_udf(\"value\")).show()\n",
+ "\n",
+ "logs = spark.table(\"system.session.python_worker_logs\")\n",
+ "logs.filter(\"logger = 'error_handler'\").select(\"msg\",
\"exception\").show(truncate=False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e54f6ac3",
+ "metadata": {},
+ "source": [
+ "#### UDTF and Python Data Source Logging\n",
+ "\n",
+ "Worker logging also works with UDTFs and Python Data Sources, capturing
both the class and function names:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "id": "02d454b0",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----------+-----+------+\n",
+ "| text| word|length|\n",
+ "+-----------+-----+------+\n",
+ "|hello world|hello| 5|\n",
+ "|hello world|world| 5|\n",
+ "+-----------+-----+------+\n",
+ "\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "|msg |context
|\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "|Processing 2 words |{func_name -> eval, class_name ->
WordSplitter} |\n",
+
"+-----------------------------+---------------------------------------------------------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql.functions import col, udtf\n",
+ "import logging\n",
+ "\n",
+ "@udtf(returnType=\"word: string, length: int\")\n",
+ "class WordSplitter:\n",
+ " def eval(self, text: str):\n",
+ " logger = logging.getLogger(\"udtf_logger\")\n",
+ " logger.setLevel(logging.INFO) # Set level to INFO to capture
info messages\n",
+ " words = text.split()\n",
+ " logger.info(f\"Processing {len(words)} words\")\n",
+ " for word in words:\n",
+ " yield (word, len(word))\n",
+ "\n",
+ "spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
+ "df = spark.createDataFrame([(\"hello world\",)], [\"text\"])\n",
+ "df.lateralJoin(WordSplitter(col(\"text\").outer())).show()\n",
+ "\n",
+ "logs = spark.table(\"system.session.python_worker_logs\")\n",
+ "logs.filter(\"logger = 'udtf_logger'\").select(\"msg\",
\"context\").show(truncate=False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "9d4c119b",
+ "metadata": {},
+ "source": [
+ "### Querying and Analyzing Logs\n",
+ "\n",
+ "You can use standard DataFrame operations to analyze logs:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "id": "5b061011",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+-----+\n",
+ "| level|count|\n",
+ "+-------+-----+\n",
+ "| INFO| 5|\n",
+ "|WARNING| 3|\n",
+ "| ERROR| 3|\n",
+ "+-------+-----+\n",
+ "\n",
+ "...\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "logs = spark.table(\"system.session.python_worker_logs\")\n",
+ "\n",
+ "# Count logs by level\n",
+ "logs.groupBy(\"level\").count().show()\n",
+ "\n",
+ "# Find all errors\n",
+ "logs.filter(\"level = 'ERROR'\").show()\n",
+ "\n",
+ "# Logs from a specific function\n",
+ "logs.filter(\"context.func_name = 'my_udf'\").show()\n",
+ "\n",
+ "# Logs with exceptions\n",
+ "logs.filter(\"exception is not null\").show()\n",
+ "\n",
+ "# Time-based analysis\n",
+ "logs.orderBy(\"ts\").show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7eaa72b9",
+ "metadata": {},
+ "source": [
+ "\n"
+ ]
+ },
{
"attachments": {},
"cell_type": "markdown",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]