This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch execution-time-code-in-task-sdk
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to
refs/heads/execution-time-code-in-task-sdk by this push:
new e1cbfd4290d fixup! Code reviews
e1cbfd4290d is described below
commit e1cbfd4290d0d44962976680976228df3401c43c
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Nov 13 15:50:59 2024 +0000
fixup! Code reviews
---
Dockerfile | 2 +-
Dockerfile.ci | 2 +-
scripts/docker/install_airflow.sh | 2 +-
task_sdk/pyproject.toml | 2 +-
task_sdk/src/airflow/sdk/api/client.py | 8 ++---
.../src/airflow/sdk/execution_time/supervisor.py | 4 +--
task_sdk/src/airflow/sdk/log.py | 37 +++++++++++-----------
7 files changed, 29 insertions(+), 28 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 5ca9949b021..d9fb1878f11 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -890,7 +890,7 @@ function install_airflow() {
# Similarly we need _a_ file for task_sdk too
mkdir -p ./task_sdk/src/airflow/sdk/
- touch ./task_sdk/src/airflow/sdk/__init__.py
+ echo '__version__ = "0.0.0dev0"' >
./task_sdk/src/airflow/sdk/__init__.py
trap 'rm -f ./providers/src/airflow/providers/__init__.py
./task_sdk/src/airflow/__init__.py 2>/dev/null' EXIT
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 943270aec69..952993984e5 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -660,7 +660,7 @@ function install_airflow() {
# Similarly we need _a_ file for task_sdk too
mkdir -p ./task_sdk/src/airflow/sdk/
- touch ./task_sdk/src/airflow/sdk/__init__.py
+ echo '__version__ = "0.0.0dev0"' >
./task_sdk/src/airflow/sdk/__init__.py
trap 'rm -f ./providers/src/airflow/providers/__init__.py
./task_sdk/src/airflow/__init__.py 2>/dev/null' EXIT
diff --git a/scripts/docker/install_airflow.sh
b/scripts/docker/install_airflow.sh
index 2975c50c2d6..27dd25ba260 100644
--- a/scripts/docker/install_airflow.sh
+++ b/scripts/docker/install_airflow.sh
@@ -54,7 +54,7 @@ function install_airflow() {
# Similarly we need _a_ file for task_sdk too
mkdir -p ./task_sdk/src/airflow/sdk/
- touch ./task_sdk/src/airflow/sdk/__init__.py
+ echo '__version__ = "0.0.0dev0"' >
./task_sdk/src/airflow/sdk/__init__.py
trap 'rm -f ./providers/src/airflow/providers/__init__.py
./task_sdk/src/airflow/__init__.py 2>/dev/null' EXIT
diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml
index cd6daea17a5..63ff4b09e61 100644
--- a/task_sdk/pyproject.toml
+++ b/task_sdk/pyproject.toml
@@ -56,7 +56,7 @@ namespace-packages = ["src/airflow"]
# Ignore Doc rules et al for anything outside of tests
"!src/*" = ["D", "TID253", "S101", "TRY002"]
-# Only have pytest rules in tests -
https://github.com/astral-sh/ruff/issues/14205
+# Ignore the pytest rules outside the tests folder -
https://github.com/astral-sh/ruff/issues/14205
"!tests/*" = ["PT"]
# Pycharm barfs if this "stub" file has future imports
diff --git a/task_sdk/src/airflow/sdk/api/client.py
b/task_sdk/src/airflow/sdk/api/client.py
index 10d073fa13b..82618140297 100644
--- a/task_sdk/src/airflow/sdk/api/client.py
+++ b/task_sdk/src/airflow/sdk/api/client.py
@@ -98,16 +98,16 @@ class TaskInstanceOperations:
"""Tell the API server that this TI has started running."""
body = TIEnterRunningPayload(pid=pid, hostname=get_hostname(),
unixname=getuser(), start_date=when)
- self.client.patch(f"task_instance/{id}/state",
content=self.client.encoder.encode(body))
+ self.client.patch(f"task-instance/{id}/state",
content=self.client.encoder.encode(body))
def finish(self, id: uuid.UUID, state: TaskInstanceState, when: datetime):
"""Tell the API server that this TI has reached a terminal state."""
body = TITerminalStatePayload(end_date=when,
state=TerminalState(state))
- self.client.patch(f"task_instance/{id}/state",
content=self.client.encoder.encode(body))
+ self.client.patch(f"task-instance/{id}/state",
content=self.client.encoder.encode(body))
def heartbeat(self, id: uuid.UUID):
- self.client.put(f"task_instance/{id}/heartbeat")
+ self.client.put(f"task-instance/{id}/heartbeat")
class ConnectionOperations:
@@ -150,7 +150,7 @@ class Client(httpx.Client):
self.encoder = msgspec.json.Encoder()
if dry_run:
- # If dry run is requests, install a no op handler so that simple
tasks can "heartbeat" using a
+ # If dry run is requested, install a no op handler so that simple
tasks can "heartbeat" using a
# real client, but just don't make any HTTP requests
kwargs["transport"] = httpx.MockTransport(noop_handler)
kwargs["base_url"] = "dry-run://server"
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 574238f859f..213e45a79a7 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -479,8 +479,8 @@ class WatchedSubprocess:
# Sockets, even the `.makefile()` function don't correctly do line buffering
on reading. If a chunk is read
# and it doesn't contain a new line character, `.readline()` will just return
the chunk as is.
#
-# This returns a cb suitable for attaching to a `selector` that reads in to a
buffer, and yields lines to a
-# (sync) generator
+# This returns a callback suitable for attaching to a `selector` that reads in
to a buffer, and yields lines
+# to a (sync) generator
def make_buffered_socket_reader(
gen: Generator[None, bytes, None], buffer_size: int = 4096
) -> Callable[[socket], bool]:
diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py
index 04083bb42c3..f8e06eda4a6 100644
--- a/task_sdk/src/airflow/sdk/log.py
+++ b/task_sdk/src/airflow/sdk/log.py
@@ -140,23 +140,30 @@ def logging_processors(
structlog.processors.StackInfoRenderer(),
]
- if enable_pretty_log:
- # Imports to suppress showing code from these modules
- import asyncio
- import contextlib
-
- import click
- import httpcore
- import httpx
- import typer
+ # Imports to suppress showing code from these modules. We need the import
to get the filepath for
+ # structlog to ignore.
+ import contextlib
+
+ import click
+ import httpcore
+ import httpx
+
+ suppress = (
+ click,
+ contextlib,
+ httpx,
+ httpcore,
+ httpx,
+ )
+ if enable_pretty_log:
rich_exc_formatter = structlog.dev.RichTracebackFormatter(
# These values are picked somewhat arbitrarily to produce
useful-but-compact tracebacks. If
# we ever need to change these then they should be configurable.
extra_lines=0,
max_frames=30,
indent_guides=False,
- suppress=[asyncio, httpcore, httpx, contextlib, click, typer],
+ suppress=suppress,
)
my_styles = structlog.dev.ConsoleRenderer.get_default_level_styles()
my_styles["debug"] = structlog.dev.CYAN
@@ -171,23 +178,17 @@ def logging_processors(
}
else:
# Imports to suppress showing code from these modules
- import asyncio
import contextlib
import click
import httpcore
import httpx
- import typer
dict_exc_formatter = structlog.tracebacks.ExceptionDictTransformer(
- use_rich=False, show_locals=False, suppress=(click, typer)
+ use_rich=False, show_locals=False, suppress=suppress
)
- dict_tracebacks = structlog.processors.ExceptionRenderer(
- structlog.tracebacks.ExceptionDictTransformer(
- use_rich=False, show_locals=False, suppress=(click, typer)
- )
- )
+ dict_tracebacks =
structlog.processors.ExceptionRenderer(dict_exc_formatter)
if hasattr(__builtins__, "BaseExceptionGroup"):
exc_group_processor =
exception_group_tracebacks(dict_exc_formatter)
processors.append(exc_group_processor)