o-nikolas commented on code in PR #48491:
URL: https://github.com/apache/airflow/pull/48491#discussion_r2022016184
##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -62,6 +72,142 @@ def json_serialize(value: Any) -> str | None:
return watchtower._json_serialize_default(value)
[email protected](kw_only=True)
+class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
+ base_log_folder: Path = attrs.field(converter=Path)
+ remote_base: str = ""
+ delete_local_copy: bool = True
+
+ log_group_arn: str
+ log_stream_name: str = ""
+ log_group: str = attrs.field(init=False, repr=False)
+ region_name: str = attrs.field(init=False, repr=False)
+
+ @log_group.default
+ def _(self):
+ return self.log_group_arn.split(":")[6]
+
+ @region_name.default
+ def _(self):
+ return self.log_group_arn.split(":")[3]
+
+ @cached_property
+ def hook(self):
+ """Returns AwsLogsHook."""
+ return AwsLogsHook(
+ aws_conn_id=conf.get("logging", "remote_log_conn_id"),
region_name=self.region_name
+ )
+
+ @cached_property
+ def handler(self) -> watchtower.CloudWatchLogHandler:
+ _json_serialize = conf.getimport("aws",
"cloudwatch_task_handler_json_serializer", fallback=None)
+ return watchtower.CloudWatchLogHandler(
+ log_group_name=self.log_group,
+ log_stream_name=self.log_stream_name,
+ use_queues=True,
Review Comment:
Previously we optionally used queues (queues can cause some troubles) but
here we're setting it to True always?
##########
task-sdk/src/airflow/sdk/log.py:
##########
@@ -508,18 +514,8 @@ def upload_to_remote(logger: FilteringBoundLogger):
relative_path = Path(fname).relative_to(base_log_folder)
handler = load_remote_log_handler()
- if not isinstance(handler, FileTaskHandler):
- logger.warning(
- "Airflow core logging is not using a FileTaskHandler, can't upload
logs to remote",
- handler=type(handler),
- )
+ if not handler:
return
- # This is a _monstrosity_, and super fragile, but we don't want to do the
base FileTaskHandler
- # set_context() which opens a real FH again. (And worse, in some cases it
_truncates_ the file too). This
- # is just for the first Airflow 3 betas, but we will re-write a better
remote log interface that isn't
- # tied to being a logging Handler.
- handler.log_relative_path = relative_path.as_posix() # type:
ignore[attr-defined]
- handler.upload_on_close = True # type: ignore[attr-defined]
-
- handler.close()
+ log_relative_path = relative_path.as_posix()
+ handler.upload(log_relative_path)
Review Comment:
Do we need a close on the handler after this?
##########
task-sdk/src/airflow/sdk/log.py:
##########
@@ -472,25 +486,17 @@ def init_log_file(local_relative_path: str) -> Path:
return full_path
-def load_remote_log_handler() -> logging.Handler | None:
- from airflow.logging_config import configure_logging as
airflow_configure_logging
- from airflow.utils.log.log_reader import TaskLogReader
+def load_remote_log_handler() -> RemoteLogIO | None:
+ import airflow.logging_config
- try:
- airflow_configure_logging()
-
- return TaskLogReader().log_handler
- finally:
- # This is a _monstrosity_ but put our logging back immediately...
- configure_logging()
+ return airflow.logging_config.REMOTE_TASK_LOG
def upload_to_remote(logger: FilteringBoundLogger):
# We haven't yet switched the Remote log handlers over, they are still
wired up in providers as
# logging.Handlers (but we should re-write most of them to just be the
upload and read instead of full
# variants.) In the mean time, lets just create the right handler directly
Review Comment:
Still relevant after this PR?
##########
providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py:
##########
@@ -53,6 +57,111 @@ def logmock():
yield
+# We only test this directly on Airflow 3
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="This path only works on
Airflow 3")
+class TestCloudRemoteLogIO:
+ # We use the cap_structlog so that our changes get reverted for us
+ @pytest.fixture(autouse=True)
+ def setup_tests(self, create_runtime_ti, tmp_path, monkeypatch):
+ import structlog
+
+ import airflow.logging_config
+ import airflow.sdk.log
+ from airflow.sdk import BaseOperator
+
+ task = BaseOperator(task_id="task_1")
+ self.ti = create_runtime_ti(task)
+
+ self.remote_log_group = "log_group_name"
+ self.region_name = "us-west-2"
+ self.local_log_location = tmp_path / "local-cloudwatch-log-location"
+ self.local_log_location.mkdir()
+
+ # The subject under test
+ self.subject = CloudWatchRemoteLogIO(
+ base_log_folder=self.local_log_location,
+
log_group_arn=f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
+ log_stream_name="dag_id=a/0.log",
+ )
+
+ conn = boto3.client("logs", region_name=self.region_name)
Review Comment:
Are these tests writing to a real AWS account?
##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -46,6 +46,15 @@
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
+ from airflow.typing_compat import TypeAlias
+
+
+# These types are similar, but have distinct names to make processing them
less error prone
+LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
+"""The log messages themselves, either in already sturcutured form, or a
single string blob to be parsed later"""
+LogSourceInfo: TypeAlias = list[str]
+"""Information _about_ the log fetching process for display to a user"""
Review Comment:
Love the types!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]