ashb commented on code in PR #27758:
URL: https://github.com/apache/airflow/pull/27758#discussion_r1067960711


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1656,6 +1656,12 @@ paths:
       - $ref: '#/components/parameters/FullContent'
       - $ref: '#/components/parameters/FilterMapIndex'
       - $ref: '#/components/parameters/ContinuationToken'
+      - name: log_type
+        in: query
+        schema:
+          type: string

Review Comment:
   JSONSchema has "enum" types. Does OpenAPI have something similar?



##########
airflow/api_connexion/endpoints/log_endpoint.py:
##########
@@ -84,6 +90,8 @@ def get_log(
             TaskInstance.map_index == map_index,
         )
         .join(TaskInstance.dag_run)
+        .options(joinedload("trigger"))

Review Comment:
   We're always joining against this table, even when we aren't asking for 
tirgger logs. We probably shouldn't



##########
airflow/jobs/triggerer_job.py:
##########
@@ -17,26 +17,176 @@
 from __future__ import annotations
 
 import asyncio
+import logging
 import os
 import signal
 import sys
 import threading
 import time
+import warnings
 from collections import deque
-from typing import Deque
+from queue import SimpleQueue
+from typing import TYPE_CHECKING, Deque
 
 from sqlalchemy import func
 
 from airflow.configuration import conf
 from airflow.jobs.base_job import BaseJob
 from airflow.models.trigger import Trigger
+from airflow.settings import DONOT_MODIFY_HANDLERS
 from airflow.stats import Stats
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.typing_compat import TypedDict
+from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.trigger_handler import (
+    DropTriggerLogsFilter,
+    LocalQueueHandler,
+    TriggererHandlerWrapper,
+    TriggerMetadataFilter,
+    ctx_close_handler,
+    ctx_indiv_trigger,
+    ctx_task_instance,
+    ctx_trigger_id,
+)
 from airflow.utils.module_loading import import_string
 from airflow.utils.session import provide_session
 
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
+USING_TRIGGERER_HANDLER_WRAPPER = False
+"""
+If this value is true, trigger logging is configured to use 
TriggerHandlerWrapper
+
+:meta :private

Review Comment:
   ```suggestion
   :meta private:
   ```



##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
         """
         self.runner.stop = True
 
+    def _kill_listener(self):
+        if self.listener:
+            for h in self.listener.handlers:
+                h.close()
+            self.listener.stop()
+
     def _exit_gracefully(self, signum, frame) -> None:
         """Helper method to clean up processor_agent to avoid leaving orphan 
processes."""
         # The first time, try to exit nicely
         if not self.runner.stop:
             self.log.info("Exiting gracefully upon receiving signal %s", 
signum)
             self.runner.stop = True
+            self._kill_listener()
         else:
             self.log.warning("Forcing exit due to second exit signal %s", 
signum)

Review Comment:
   We don't stop the listner here. How would this play with s3 file uploading?



##########
airflow/utils/log/trigger_handler.py:
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextvars import ContextVar
+from copy import copy
+from logging.handlers import QueueHandler
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+ctx_task_instance: ContextVar = ContextVar("task_instance")
+ctx_trigger_id: ContextVar = ContextVar("trigger_id")
+ctx_close_handler: ContextVar = ContextVar("close_handler")

Review Comment:
   I think this one isn't used anymore



##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
         """
         self.runner.stop = True
 
+    def _kill_listener(self):
+        if self.listener:

Review Comment:
   Do we want to (attempt) to flush anything in the queue before hand?



##########
airflow/utils/log/trigger_handler.py:
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextvars import ContextVar
+from copy import copy
+from logging.handlers import QueueHandler
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+ctx_task_instance: ContextVar = ContextVar("task_instance")
+ctx_trigger_id: ContextVar = ContextVar("trigger_id")
+ctx_close_handler: ContextVar = ContextVar("close_handler")
+ctx_indiv_trigger: ContextVar = ContextVar("__individual_trigger")
+
+
+class TriggerMetadataFilter(logging.Filter):
+    """
+    Injects TI key, triggerer job_id, and trigger_id into the log record.
+
+    :meta private:
+    """
+
+    def filter(self, record):
+        for var in (
+            ctx_task_instance,
+            ctx_trigger_id,
+            ctx_close_handler,
+            ctx_indiv_trigger,
+        ):
+            val = var.get(None)
+            if val is not None:
+                setattr(record, var.name, val)
+        return True
+
+
+class DropTriggerLogsFilter(logging.Filter):
+    """
+    If record has non-empty attr trigger_id, filter the record.

Review Comment:
   Desc doesn't match up with impl. Do we actually need the separte 
`indiv_trigger` field? (i.e. if you just had this look at trigger_id I wouldn't 
have looked twice)



##########
airflow/example_dags/example_time_delta_sensor_async.py:
##########
@@ -36,6 +36,6 @@
     catchup=False,
     tags=["example"],
 ) as dag:
-    wait = TimeDeltaSensorAsync(task_id="wait", 
delta=datetime.timedelta(seconds=10))
+    wait = TimeDeltaSensorAsync(task_id="wait", 
delta=datetime.timedelta(seconds=30))

Review Comment:
   Unrelated change?



##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
         """
         self.runner.stop = True
 
+    def _kill_listener(self):
+        if self.listener:

Review Comment:
   Edit: oh, that's what `stop()` does already :+1:



##########
airflow/jobs/triggerer_job.py:
##########
@@ -245,11 +410,10 @@ async def arun(self):
             await self.cleanup_finished_triggers()
             # Sleep for a bit
             await asyncio.sleep(1)
-            # Every minute, log status if at least one trigger is running.

Review Comment:
   Why this change? Debug that was left over?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to