ashb commented on code in PR #27758:
URL: https://github.com/apache/airflow/pull/27758#discussion_r1067960711
##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1656,6 +1656,12 @@ paths:
- $ref: '#/components/parameters/FullContent'
- $ref: '#/components/parameters/FilterMapIndex'
- $ref: '#/components/parameters/ContinuationToken'
+ - name: log_type
+ in: query
+ schema:
+ type: string
Review Comment:
JSONSchema has "enum" types. Does OpenAPI have something similar?
##########
airflow/api_connexion/endpoints/log_endpoint.py:
##########
@@ -84,6 +90,8 @@ def get_log(
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
+ .options(joinedload("trigger"))
Review Comment:
We're always joining against this table, even when we aren't asking for
tirgger logs. We probably shouldn't
##########
airflow/jobs/triggerer_job.py:
##########
@@ -17,26 +17,176 @@
from __future__ import annotations
import asyncio
+import logging
import os
import signal
import sys
import threading
import time
+import warnings
from collections import deque
-from typing import Deque
+from queue import SimpleQueue
+from typing import TYPE_CHECKING, Deque
from sqlalchemy import func
from airflow.configuration import conf
from airflow.jobs.base_job import BaseJob
from airflow.models.trigger import Trigger
+from airflow.settings import DONOT_MODIFY_HANDLERS
from airflow.stats import Stats
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.typing_compat import TypedDict
+from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.trigger_handler import (
+ DropTriggerLogsFilter,
+ LocalQueueHandler,
+ TriggererHandlerWrapper,
+ TriggerMetadataFilter,
+ ctx_close_handler,
+ ctx_indiv_trigger,
+ ctx_task_instance,
+ ctx_trigger_id,
+)
from airflow.utils.module_loading import import_string
from airflow.utils.session import provide_session
+if TYPE_CHECKING:
+ from airflow.models import TaskInstance
+
+USING_TRIGGERER_HANDLER_WRAPPER = False
+"""
+If this value is true, trigger logging is configured to use
TriggerHandlerWrapper
+
+:meta :private
Review Comment:
```suggestion
:meta private:
```
##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
"""
self.runner.stop = True
+ def _kill_listener(self):
+ if self.listener:
+ for h in self.listener.handlers:
+ h.close()
+ self.listener.stop()
+
def _exit_gracefully(self, signum, frame) -> None:
"""Helper method to clean up processor_agent to avoid leaving orphan
processes."""
# The first time, try to exit nicely
if not self.runner.stop:
self.log.info("Exiting gracefully upon receiving signal %s",
signum)
self.runner.stop = True
+ self._kill_listener()
else:
self.log.warning("Forcing exit due to second exit signal %s",
signum)
Review Comment:
We don't stop the listner here. How would this play with s3 file uploading?
##########
airflow/utils/log/trigger_handler.py:
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextvars import ContextVar
+from copy import copy
+from logging.handlers import QueueHandler
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+ctx_task_instance: ContextVar = ContextVar("task_instance")
+ctx_trigger_id: ContextVar = ContextVar("trigger_id")
+ctx_close_handler: ContextVar = ContextVar("close_handler")
Review Comment:
I think this one isn't used anymore
##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
"""
self.runner.stop = True
+ def _kill_listener(self):
+ if self.listener:
Review Comment:
Do we want to (attempt) to flush anything in the queue before hand?
##########
airflow/utils/log/trigger_handler.py:
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextvars import ContextVar
+from copy import copy
+from logging.handlers import QueueHandler
+
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+ctx_task_instance: ContextVar = ContextVar("task_instance")
+ctx_trigger_id: ContextVar = ContextVar("trigger_id")
+ctx_close_handler: ContextVar = ContextVar("close_handler")
+ctx_indiv_trigger: ContextVar = ContextVar("__individual_trigger")
+
+
+class TriggerMetadataFilter(logging.Filter):
+ """
+ Injects TI key, triggerer job_id, and trigger_id into the log record.
+
+ :meta private:
+ """
+
+ def filter(self, record):
+ for var in (
+ ctx_task_instance,
+ ctx_trigger_id,
+ ctx_close_handler,
+ ctx_indiv_trigger,
+ ):
+ val = var.get(None)
+ if val is not None:
+ setattr(record, var.name, val)
+ return True
+
+
+class DropTriggerLogsFilter(logging.Filter):
+ """
+ If record has non-empty attr trigger_id, filter the record.
Review Comment:
Desc doesn't match up with impl. Do we actually need the separte
`indiv_trigger` field? (i.e. if you just had this look at trigger_id I wouldn't
have looked twice)
##########
airflow/example_dags/example_time_delta_sensor_async.py:
##########
@@ -36,6 +36,6 @@
catchup=False,
tags=["example"],
) as dag:
- wait = TimeDeltaSensorAsync(task_id="wait",
delta=datetime.timedelta(seconds=10))
+ wait = TimeDeltaSensorAsync(task_id="wait",
delta=datetime.timedelta(seconds=30))
Review Comment:
Unrelated change?
##########
airflow/jobs/triggerer_job.py:
##########
@@ -87,19 +241,29 @@ def on_kill(self):
"""
self.runner.stop = True
+ def _kill_listener(self):
+ if self.listener:
Review Comment:
Edit: oh, that's what `stop()` does already :+1:
##########
airflow/jobs/triggerer_job.py:
##########
@@ -245,11 +410,10 @@ async def arun(self):
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
- # Every minute, log status if at least one trigger is running.
Review Comment:
Why this change? Debug that was left over?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]