o-nikolas commented on code in PR #48491:
URL: https://github.com/apache/airflow/pull/48491#discussion_r2022016184


##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -62,6 +72,142 @@ def json_serialize(value: Any) -> str | None:
     return watchtower._json_serialize_default(value)
 
 
[email protected](kw_only=True)
+class CloudWatchRemoteLogIO(LoggingMixin):  # noqa: D101
+    base_log_folder: Path = attrs.field(converter=Path)
+    remote_base: str = ""
+    delete_local_copy: bool = True
+
+    log_group_arn: str
+    log_stream_name: str = ""
+    log_group: str = attrs.field(init=False, repr=False)
+    region_name: str = attrs.field(init=False, repr=False)
+
+    @log_group.default
+    def _(self):
+        return self.log_group_arn.split(":")[6]
+
+    @region_name.default
+    def _(self):
+        return self.log_group_arn.split(":")[3]
+
+    @cached_property
+    def hook(self):
+        """Returns AwsLogsHook."""
+        return AwsLogsHook(
+            aws_conn_id=conf.get("logging", "remote_log_conn_id"), 
region_name=self.region_name
+        )
+
+    @cached_property
+    def handler(self) -> watchtower.CloudWatchLogHandler:
+        _json_serialize = conf.getimport("aws", 
"cloudwatch_task_handler_json_serializer", fallback=None)
+        return watchtower.CloudWatchLogHandler(
+            log_group_name=self.log_group,
+            log_stream_name=self.log_stream_name,
+            use_queues=True,

Review Comment:
   Previously we optionally used queues (queues can cause some troubles) but 
here we're setting it to True always?



##########
task-sdk/src/airflow/sdk/log.py:
##########
@@ -508,18 +514,8 @@ def upload_to_remote(logger: FilteringBoundLogger):
     relative_path = Path(fname).relative_to(base_log_folder)
 
     handler = load_remote_log_handler()
-    if not isinstance(handler, FileTaskHandler):
-        logger.warning(
-            "Airflow core logging is not using a FileTaskHandler, can't upload 
logs to remote",
-            handler=type(handler),
-        )
+    if not handler:
         return
 
-    # This is a _monstrosity_, and super fragile, but we don't want to do the 
base FileTaskHandler
-    # set_context() which opens a real FH again. (And worse, in some cases it 
_truncates_ the file too). This
-    # is just for the first Airflow 3 betas, but we will re-write a better 
remote log interface that isn't
-    # tied to being a logging Handler.
-    handler.log_relative_path = relative_path.as_posix()  # type: 
ignore[attr-defined]
-    handler.upload_on_close = True  # type: ignore[attr-defined]
-
-    handler.close()
+    log_relative_path = relative_path.as_posix()
+    handler.upload(log_relative_path)

Review Comment:
   Do we need a close on the handler after this?



##########
task-sdk/src/airflow/sdk/log.py:
##########
@@ -472,25 +486,17 @@ def init_log_file(local_relative_path: str) -> Path:
     return full_path
 
 
-def load_remote_log_handler() -> logging.Handler | None:
-    from airflow.logging_config import configure_logging as 
airflow_configure_logging
-    from airflow.utils.log.log_reader import TaskLogReader
+def load_remote_log_handler() -> RemoteLogIO | None:
+    import airflow.logging_config
 
-    try:
-        airflow_configure_logging()
-
-        return TaskLogReader().log_handler
-    finally:
-        # This is a _monstrosity_ but put our logging back immediately...
-        configure_logging()
+    return airflow.logging_config.REMOTE_TASK_LOG
 
 
 def upload_to_remote(logger: FilteringBoundLogger):
     # We haven't yet switched the Remote log handlers over, they are still 
wired up in providers as
     # logging.Handlers (but we should re-write most of them to just be the 
upload and read instead of full
     # variants.) In the mean time, lets just create the right handler directly

Review Comment:
   Still relevant after this PR?



##########
providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py:
##########
@@ -53,6 +57,111 @@ def logmock():
         yield
 
 
+# We only test this directly on Airflow 3
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="This path only works on 
Airflow 3")
+class TestCloudRemoteLogIO:
+    # We use the cap_structlog so that our changes get reverted for us
+    @pytest.fixture(autouse=True)
+    def setup_tests(self, create_runtime_ti, tmp_path, monkeypatch):
+        import structlog
+
+        import airflow.logging_config
+        import airflow.sdk.log
+        from airflow.sdk import BaseOperator
+
+        task = BaseOperator(task_id="task_1")
+        self.ti = create_runtime_ti(task)
+
+        self.remote_log_group = "log_group_name"
+        self.region_name = "us-west-2"
+        self.local_log_location = tmp_path / "local-cloudwatch-log-location"
+        self.local_log_location.mkdir()
+
+        # The subject under test
+        self.subject = CloudWatchRemoteLogIO(
+            base_log_folder=self.local_log_location,
+            
log_group_arn=f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
+            log_stream_name="dag_id=a/0.log",
+        )
+
+        conn = boto3.client("logs", region_name=self.region_name)

Review Comment:
   Are these tests writing to a real AWS account?



##########
airflow-core/src/airflow/utils/log/file_task_handler.py:
##########
@@ -46,6 +46,15 @@
     from airflow.executors.base_executor import BaseExecutor
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancekey import TaskInstanceKey
+    from airflow.typing_compat import TypeAlias
+
+
+# These types are similar, but have distinct names to make processing them 
less error prone
+LogMessages: TypeAlias = Union[list["StructuredLogMessage"], list[str]]
+"""The log messages themselves, either in already sturcutured form, or a 
single string blob to be parsed later"""
+LogSourceInfo: TypeAlias = list[str]
+"""Information _about_ the log fetching process for display to a user"""

Review Comment:
   Love the types!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to