This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b07d79908c Implements JSON-string connection representation generator 
(#35723)
b07d79908c is described below

commit b07d79908c6aee3617954f000999ba77e4faff08
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Nov 24 00:15:21 2023 +0400

    Implements JSON-string connection representation generator (#35723)
    
    * Implements JSON-string connection representation generator
    
    * json_repr -> as_json()
    
    * Apply suggestions from code review
    
    Co-authored-by: Vincent <[email protected]>
    
    ---------
    
    Co-authored-by: Vincent <[email protected]>
---
 airflow/models/connection.py                   | 35 ++++++++++++++++
 airflow/serialization/serialized_objects.py    |  2 +-
 docs/apache-airflow/howto/connection.rst       | 37 +++++++++++++++++
 tests/always/test_connection.py                | 56 ++++++++++++++++++++++++++
 tests/serialization/test_serialized_objects.py | 23 ++++++++++-
 5 files changed, 151 insertions(+), 2 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 373c10f326..1e835b4673 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -32,6 +32,7 @@ from airflow.exceptions import AirflowException, 
AirflowNotFoundException, Remov
 from airflow.models.base import ID_LEN, Base
 from airflow.models.crypto import get_fernet
 from airflow.secrets.cache import SecretCache
+from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.secrets_masker import mask_secret
 from airflow.utils.module_loading import import_string
@@ -480,6 +481,34 @@ class Connection(Base, LoggingMixin):
     def to_dict(self) -> dict[str, Any]:
         return {"conn_id": self.conn_id, "description": self.description, 
"uri": self.get_uri()}
 
+    def to_json_dict(self, *, prune_empty: bool = False, validate: bool = 
True) -> dict[str, Any]:
+        """
+        Convert Connection to json-serializable dictionary.
+
+        :param prune_empty: Whether or not remove empty values.
+        :param validate: Validate dictionary is JSON-serializable
+
+        :meta private:
+        """
+        conn = {
+            "conn_id": self.conn_id,
+            "conn_type": self.conn_type,
+            "description": self.description,
+            "host": self.host,
+            "login": self.login,
+            "password": self.password,
+            "schema": self.schema,
+            "port": self.port,
+        }
+        if prune_empty:
+            conn = prune_dict(val=conn, mode="strict")
+        if (extra := self.extra_dejson) or not prune_empty:
+            conn["extra"] = extra
+
+        if validate:
+            json.dumps(conn)
+        return conn
+
     @classmethod
     def from_json(cls, value, conn_id=None) -> Connection:
         kwargs = json.loads(value)
@@ -496,3 +525,9 @@ class Connection(Base, LoggingMixin):
             except ValueError:
                 raise ValueError(f"Expected integer value for `port`, but got 
{port!r} instead.")
         return Connection(conn_id=conn_id, **kwargs)
+
+    def as_json(self) -> str:
+        """Convert Connection to JSON-string object."""
+        conn = self.to_json_dict(prune_empty=True, validate=False)
+        conn.pop("conn_id", None)
+        return json.dumps(conn)
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 9d7955bf01..c40d4703ee 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -498,7 +498,7 @@ class BaseSerialization:
                 type_=DAT.SIMPLE_TASK_INSTANCE,
             )
         elif isinstance(var, Connection):
-            return cls._encode(var.to_dict(), type_=DAT.CONNECTION)
+            return cls._encode(var.to_json_dict(validate=True), 
type_=DAT.CONNECTION)
         elif use_pydantic_models and _ENABLE_AIP_44:
 
             def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> 
dict[str, Any]:
diff --git a/docs/apache-airflow/howto/connection.rst 
b/docs/apache-airflow/howto/connection.rst
index 2c07b03b6d..f90f7e96b1 100644
--- a/docs/apache-airflow/howto/connection.rst
+++ b/docs/apache-airflow/howto/connection.rst
@@ -66,6 +66,43 @@ If serializing with JSON:
         }
     }'
 
+Generating a JSON connection representation
+"""""""""""""""""""""""""""""""""""""""""""
+
+.. versionadded:: 2.8.0
+
+
+To make connection JSON generation easier, the 
:py:class:`~airflow.models.connection.Connection` class has a
+convenience property :py:meth:`~airflow.models.connection.Connection.as_json`. 
It can be used like so:
+
+.. code-block:: pycon
+
+    >>> from airflow.models.connection import Connection
+    >>> c = Connection(
+    ...     conn_id="some_conn",
+    ...     conn_type="mysql",
+    ...     description="connection description",
+    ...     host="myhost.com",
+    ...     login="myname",
+    ...     password="mypassword",
+    ...     extra={"this_param": "some val", "that_param": "other val*"},
+    ... )
+    >>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
+    AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection 
description", "host": "myhost.com", "login": "myname", "password": 
"mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
+
+In addition, same approach could be used to convert Connection from URI format 
to JSON format
+
+.. code-block:: pycon
+
+    >>> from airflow.models.connection import Connection
+    >>> c = Connection(
+    ...     conn_id="awesome_conn",
+    ...     description="Example Connection",
+    ...     
uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
+    ... )
+    >>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
+    AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example 
Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": 
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": 
{"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": 
"standard", "max_attempts": 10}}}}'
+
 
 URI format example
 ^^^^^^^^^^^^^^^^^^
diff --git a/tests/always/test_connection.py b/tests/always/test_connection.py
index 9b0df4ea89..e01907a8d3 100644
--- a/tests/always/test_connection.py
+++ b/tests/always/test_connection.py
@@ -790,3 +790,59 @@ class TestConnection:
         assert Connection(host="abc").get_uri() == "//abc"
         # parsing back as conn still works
         assert Connection(uri="//abc").host == "abc"
+
+    @pytest.mark.parametrize(
+        "conn, expected_json",
+        [
+            pytest.param(Connection(), "{}", id="empty"),
+            pytest.param(Connection(host="apache.org", extra={}), '{"host": 
"apache.org"}', id="empty-extra"),
+            pytest.param(
+                Connection(conn_type="foo", login="", password="p@$$"),
+                '{"conn_type": "foo", "login": "", "password": "p@$$"}',
+                id="some-fields",
+            ),
+            pytest.param(
+                Connection(
+                    conn_type="bar",
+                    description="Sample Description",
+                    host="example.org",
+                    login="user",
+                    password="p@$$",
+                    schema="schema",
+                    port=777,
+                    extra={"foo": "bar", "answer": 42},
+                ),
+                json.dumps(
+                    {
+                        "conn_type": "bar",
+                        "description": "Sample Description",
+                        "host": "example.org",
+                        "login": "user",
+                        "password": "p@$$",
+                        "schema": "schema",
+                        "port": 777,
+                        "extra": {"foo": "bar", "answer": 42},
+                    }
+                ),
+                id="all-fields",
+            ),
+            pytest.param(
+                Connection(uri="aws://"),
+                # During parsing URI some of the fields evaluated as an empty 
strings
+                '{"conn_type": "aws", "host": "", "schema": ""}',
+                id="uri",
+            ),
+        ],
+    )
+    def test_as_json_from_connection(self, conn: Connection, expected_json):
+        result = conn.as_json()
+        assert result == expected_json
+        restored_conn = Connection.from_json(result)
+
+        assert restored_conn.conn_type == conn.conn_type
+        assert restored_conn.description == conn.description
+        assert restored_conn.host == conn.host
+        assert restored_conn.password == conn.password
+        assert restored_conn.schema == conn.schema
+        assert restored_conn.port == conn.port
+        assert restored_conn.extra_dejson == conn.extra_dejson
diff --git a/tests/serialization/test_serialized_objects.py 
b/tests/serialization/test_serialized_objects.py
index e05f69114c..0af29e8ebc 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -36,7 +36,7 @@ from airflow.models.taskinstance import SimpleTaskInstance, 
TaskInstance
 from airflow.models.xcom_arg import XComArg
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
-from airflow.serialization.enums import DagAttributeTypes as DAT
+from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
 from airflow.serialization.pydantic.dag import DagModelPydantic
 from airflow.serialization.pydantic.dag_run import DagRunPydantic
 from airflow.serialization.pydantic.job import JobPydantic
@@ -213,6 +213,27 @@ def test_serialize_deserialize(input, encoded_type, 
cmp_func):
     json.dumps(serialized)  # does not raise
 
 
[email protected](
+    "conn_uri",
+    [
+        pytest.param("aws://", id="only-conn-type"),
+        
pytest.param("postgres://username:[email protected]:5432/the_database", 
id="all-non-extra"),
+        pytest.param(
+            
"///?__extra__=%7B%22foo%22%3A+%22bar%22%2C+%22answer%22%3A+42%2C+%22"
+            
"nullable%22%3A+null%2C+%22empty%22%3A+%22%22%2C+%22zero%22%3A+0%7D",
+            id="extra",
+        ),
+    ],
+)
+def test_backcompat_deserialize_connection(conn_uri):
+    """Test deserialize connection which serialised by previous serializer 
implementation."""
+    from airflow.serialization.serialized_objects import BaseSerialization
+
+    conn_obj = {Encoding.TYPE: DAT.CONNECTION, Encoding.VAR: {"conn_id": 
"TEST_ID", "uri": conn_uri}}
+    deserialized = BaseSerialization.deserialize(conn_obj)
+    assert deserialized.get_uri() == conn_uri
+
+
 @pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
 @pytest.mark.parametrize(
     "input, pydantic_class, encoded_type, cmp_func",

Reply via email to