ashb commented on code in PR #66633:
URL: https://github.com/apache/airflow/pull/66633#discussion_r3339279734


##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -39,8 +41,6 @@
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
-    import structlog.typing

Review Comment:
   This should stay as a type checking only import?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -467,7 +468,8 @@ def exit(n: int) -> NoReturn:
         except Exception as e:
             with suppress(Exception):
                 print(
-                    f"--- Last chance exception handler failed --- 
{repr(str(e))}\n", file=last_chance_stderr
+                    f"--- Last chance exception handler failed --- 
{repr(str(e))}\n",
+                    file=last_chance_stderr,

Review Comment:
   Formatting only change. Undo it



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -628,13 +630,22 @@ def start(
                     # execv replaces the process -- unreachable on success
                 else:
                     # Run the child entrypoint
-                    _fork_main(child_requests, child_stdout, child_stderr, 
child_logs.fileno(), target)
+                    _fork_main(
+                        child_requests,
+                        child_stdout,
+                        child_stderr,
+                        child_logs.fileno(),
+                        target,

Review Comment:
   Ditto



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2301,18 +2400,24 @@ def supervise_task(
 
         reset_secrets_masker()
 
+        log_ctx = _configure_logging(log_path, client) if log_path else 
contextlib.nullcontext((None, None))
+
         try:
-            process = ActivitySubprocess.start(
-                dag_rel_path=dag_rel_path,
-                what=ti,
-                client=client,
-                logger=logger,
-                bundle_info=bundle_info,
-                subprocess_logs_to_stdout=subprocess_logs_to_stdout,
-                sentry_integration=sentry_integration,
-            )
+            with log_ctx as (logger, _):

Review Comment:
   Why do we return two values if we only need one?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2189,11 +2255,30 @@ def ensure_secrets_backend_loaded() -> 
list[BaseSecretsBackend]:
     return ensure_secrets_loaded(default_backends=fallback_backends)
 
 
-def _configure_logging(log_path: str, client: Client) -> 
tuple[FilteringBoundLogger, BinaryIO | TextIO]:
+def _close_remote_log_handler(handler: RemoteLogIO) -> None:
+    """
+    Close the remote log handler explicitly after all task log messages have 
been drained from the subprocess pipe.
+
+    Called after process.wait() returns, before process exit triggers
+    logging.shutdown(). This ensures Watchtower's internal batch queue
+    is flushed to CloudWatch before the process tears down.
+    """
+    with contextlib.suppress(Exception):

Review Comment:
   Hiding the error entirely makes it hard to debug problems. We should log it?



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2301,18 +2400,24 @@ def supervise_task(
 
         reset_secrets_masker()
 
+        log_ctx = _configure_logging(log_path, client) if log_path else 
contextlib.nullcontext((None, None))

Review Comment:
   Why did you move this? It changes what appears in the task logs by doing 
this later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to