This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c01abd1c2e Upgrade watchtower to 3.0.1 (#25019) (#34747)
c01abd1c2e is described below
commit c01abd1c2eed8f60fec5b9d6cc0232b54efa52de
Author: cBiscuitSurprise <[email protected]>
AuthorDate: Fri Oct 6 09:35:09 2023 -0500
Upgrade watchtower to 3.0.1 (#25019) (#34747)
---
.../amazon/aws/log/cloudwatch_task_handler.py | 35 ++++++++++++++-
airflow/providers/amazon/provider.yaml | 24 ++++++++---
docs/apache-airflow-providers-amazon/index.rst | 2 +-
generated/provider_dependencies.json | 2 +-
.../amazon/aws/log/test_cloudwatch_task_handler.py | 50 +++++++++++++++++++++-
5 files changed, 103 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 4d3ccec00c..126ed3ea84 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,9 +17,9 @@
# under the License.
from __future__ import annotations
-from datetime import datetime, timedelta
+from datetime import date, datetime, timedelta
from functools import cached_property
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import watchtower
@@ -33,6 +33,35 @@ if TYPE_CHECKING:
from airflow.models import TaskInstance
+def json_serialize_legacy(value: Any) -> str | None:
+ """
+ JSON serializer replicating legacy watchtower behavior.
+
+ The legacy `[email protected]` json serializer function that serialized
+ datetime objects as ISO format and all other non-JSON-serializable to
`null`.
+
+ :param value: the object to serialize
+ :return: string representation of `value` if it is an instance of datetime
or `None` otherwise
+ """
+ if isinstance(value, (date, datetime)):
+ return value.isoformat()
+ else:
+ return None
+
+
+def json_serialize(value: Any) -> str | None:
+ """
+ JSON serializer replicating current watchtower behavior.
+
+ This provides customers with an accessible import,
+ `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize`
+
+ :param value: the object to serialize
+ :return: string representation of `value`
+ """
+ return watchtower._json_serialize_default(value)
+
+
class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
"""
CloudwatchTaskHandler is a python log handler that handles and reads task
instance logs.
@@ -69,11 +98,13 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
def set_context(self, ti):
super().set_context(ti)
+ self.json_serialize = conf.getimport("aws",
"cloudwatch_task_handler_json_serializer")
self.handler = watchtower.CloudWatchLogHandler(
log_group_name=self.log_group,
log_stream_name=self._render_filename(ti, ti.try_number),
use_queues=not getattr(ti, "is_trigger_log_context", False),
boto3_client=self.hook.get_conn(),
+ json_serialize_default=self.json_serialize,
)
def close(self):
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 8ce738480c..3f5ad39025 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -81,11 +81,7 @@ dependencies:
# NOTE!!! BOTOCORE version is always shifted by 3 MINOR VERSIONS from boto3
# See https://github.com/boto/boto3/issues/2702
- botocore>=1.31.0
- # watchtower 3 has been released end Jan and introduced breaking change
across the board that might
- # change logging behaviour:
- #
https://github.com/kislyuk/watchtower/blob/develop/Changes.rst#changes-for-v300-2022-01-26
- # TODO: update to watchtower >3
- - watchtower~=2.0.1
+ - watchtower~=3.0.1
- jsonpath_ng>=1.5.3
- redshift_connector>=2.0.888
- sqlalchemy_redshift>=0.8.6
@@ -726,3 +722,21 @@ config:
example: my_company.aws.MyCustomSessionFactory
type: string
version_added: 3.1.1
+ cloudwatch_task_handler_json_serializer:
+ description: |
+ By default, when logging non-string messages, all non-json objects
are logged as `null`.
+ Except `datetime` objects which are ISO formatted. Users can
optionally use a `repr` serializer or
+ provide their own JSON serializer for any non-JSON-serializable
objects in the logged message.
+
+ *
`airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize` uses
`repr` (be aware
+ there is the potential of logging sensitive data depending on the
`repr` method of logged objects)
+ *
`airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy`
uses `null`.
+
+ If a custom serializer is provided, it must adhere to
`Callable[[Any], str | None]`, where `None`
+ serializes to `null` (e.g. `def my_serializer(o: Any) -> str |
None`). Since this is on the logging
+ path and it's possible there's an exception being handled, special
care should be taken to fail
+ gracefully without raising a new exception inside of your serializer.
+ type: string
+ version_added: 8.7.2
+ example:
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize
+ default:
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
diff --git a/docs/apache-airflow-providers-amazon/index.rst
b/docs/apache-airflow-providers-amazon/index.rst
index d846b73c56..c1f52f9651 100644
--- a/docs/apache-airflow-providers-amazon/index.rst
+++ b/docs/apache-airflow-providers-amazon/index.rst
@@ -112,7 +112,7 @@ PIP package Version required
``boto3`` ``>=1.28.0``
``botocore`` ``>=1.31.0``
``asgiref``
-``watchtower`` ``~=2.0.1``
+``watchtower`` ``~=3.0.1``
``jsonpath_ng`` ``>=1.5.3``
``redshift_connector`` ``>=2.0.888``
``sqlalchemy_redshift`` ``>=0.8.6``
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 212f308989..5fc8ed0df0 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -30,7 +30,7 @@
"jsonpath_ng>=1.5.3",
"redshift_connector>=2.0.888",
"sqlalchemy_redshift>=0.8.6",
- "watchtower~=2.0.1"
+ "watchtower~=3.0.1"
],
"cross-providers-deps": [
"apache.hive",
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index 00d5cf2f46..b3c1cd6e39 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -17,10 +17,12 @@
# under the License.
from __future__ import annotations
+import contextlib
+import logging
import time
from datetime import datetime as dt, timedelta
from unittest import mock
-from unittest.mock import call
+from unittest.mock import ANY, Mock, call
import boto3
import moto
@@ -171,6 +173,52 @@ class TestCloudwatchTaskHandler:
end_time=expected_end_time,
)
+ @pytest.mark.parametrize(
+ "conf_json_serialize, expected_serialized_output",
+ [
+ (None, '{"datetime": "2023-01-01T00:00:00+00:00", "customObject":
null}'),
+ (
+
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize",
+ '{"datetime": "2023-01-01T00:00:00+00:00", "customObject":
"SomeCustomSerialization(...)"}',
+ ),
+ ],
+ )
+ @mock.patch.object(AwsLogsHook, "get_log_events")
+ def test_write_json_logs(self, mock_get_log_events, conf_json_serialize,
expected_serialized_output):
+ class ToSerialize:
+ def __init__(self):
+ pass
+
+ def __repr__(self):
+ return "SomeCustomSerialization(...)"
+
+ with contextlib.ExitStack() as stack:
+ if conf_json_serialize:
+ stack.enter_context(
+ conf_vars({("aws",
"cloudwatch_task_handler_json_serializer"): conf_json_serialize})
+ )
+
+ handler = self.cloudwatch_task_handler
+ handler.set_context(self.ti)
+ message = logging.LogRecord(
+ name="test_log_record",
+ level=logging.DEBUG,
+ pathname="fake.path",
+ lineno=42,
+ args=None,
+ exc_info=None,
+ msg={
+ "datetime": datetime(2023, 1, 1),
+ "customObject": ToSerialize(),
+ },
+ )
+ stack.enter_context(mock.patch("watchtower.threading.Thread"))
+ mock_queue = Mock()
+ stack.enter_context(mock.patch("watchtower.queue.Queue",
return_value=mock_queue))
+ handler.handle(message)
+
+ mock_queue.put.assert_called_once_with({"message":
expected_serialized_output, "timestamp": ANY})
+
def test_close_prevents_duplicate_calls(self):
with mock.patch("watchtower.CloudWatchLogHandler.close") as
mock_log_handler_close:
with
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):