This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch revert-57988-bugfix/remove-unused-null-fernet
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cb56ad7a8893051a6086cda23ce79e422f5e17da
Author: Bugra Ozturk <[email protected]>
AuthorDate: Sun Nov 9 18:36:48 2025 +0100

    Revert "Remove unused NullFernet from Crypto (#57988)"
    
    This reverts commit c47c0817d8e11f885f4eef91232c08e1469e8c95.
---
 airflow-core/src/airflow/models/connection.py      | 14 ++++-
 airflow-core/src/airflow/models/crypto.py          | 61 ++++++++++++++++++++--
 airflow-core/src/airflow/models/variable.py        |  2 +-
 airflow-core/tests/unit/always/test_connection.py  | 12 +++++
 .../cli/commands/test_rotate_fernet_key_command.py | 24 +++++++++
 airflow-core/tests/unit/models/test_variable.py    | 14 +++++
 6 files changed, 121 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/connection.py 
b/airflow-core/src/airflow/models/connection.py
index 3f9e5e3deed..f64c38b5efe 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -343,6 +343,11 @@ class Connection(Base, LoggingMixin):
         """Return encrypted password."""
         if self._password and self.is_encrypted:
             fernet = get_fernet()
+            if not fernet.is_encrypted:
+                raise AirflowException(
+                    f"Can't decrypt encrypted password for login={self.login}  
"
+                    f"FERNET_KEY configuration is missing"
+                )
             return fernet.decrypt(bytes(self._password, "utf-8")).decode()
         return self._password
 
@@ -351,7 +356,7 @@ class Connection(Base, LoggingMixin):
         if value:
             fernet = get_fernet()
             self._password = fernet.encrypt(bytes(value, "utf-8")).decode()
-            self.is_encrypted = True
+            self.is_encrypted = fernet.is_encrypted
 
     @declared_attr
     def password(cls):
@@ -362,6 +367,11 @@ class Connection(Base, LoggingMixin):
         """Return encrypted extra-data."""
         if self._extra and self.is_extra_encrypted:
             fernet = get_fernet()
+            if not fernet.is_encrypted:
+                raise AirflowException(
+                    f"Can't decrypt `extra` params for login={self.login}, "
+                    f"FERNET_KEY configuration is missing"
+                )
             extra_val = fernet.decrypt(bytes(self._extra, "utf-8")).decode()
         else:
             extra_val = self._extra
@@ -375,7 +385,7 @@ class Connection(Base, LoggingMixin):
             self._validate_extra(value, self.conn_id)
             fernet = get_fernet()
             self._extra = fernet.encrypt(bytes(value, "utf-8")).decode()
-            self.is_extra_encrypted = True
+            self.is_extra_encrypted = fernet.is_encrypted
         else:
             self._extra = value
             self.is_extra_encrypted = False
diff --git a/airflow-core/src/airflow/models/crypto.py 
b/airflow-core/src/airflow/models/crypto.py
index 31423bb8444..c62446b7631 100644
--- a/airflow-core/src/airflow/models/crypto.py
+++ b/airflow-core/src/airflow/models/crypto.py
@@ -30,6 +30,8 @@ log = logging.getLogger(__name__)
 class FernetProtocol(Protocol):
     """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
 
+    is_encrypted: bool
+
     def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes:
         """Decrypt with Fernet."""
         ...
@@ -38,9 +40,57 @@ class FernetProtocol(Protocol):
         """Encrypt with Fernet."""
         ...
 
+
+class _NullFernet:
+    """
+    A "Null" encryptor class that doesn't encrypt or decrypt but that presents 
a similar interface to Fernet.
+
+    The purpose of this is to make the rest of the code not have to know the
+    difference, and to only display the message once, not 20 times when
+    `airflow db migrate` is run.
+    """
+
+    is_encrypted = False
+
+    def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes:
+        """Decrypt with Fernet."""
+        if isinstance(msg, bytes):
+            return msg
+        if isinstance(msg, str):
+            return msg.encode("utf-8")
+        raise ValueError(f"Expected bytes or str, got {type(msg)}")
+
+    def encrypt(self, msg: bytes) -> bytes:
+        """Encrypt with Fernet."""
+        return msg
+
+
+class _RealFernet:
+    """
+    A wrapper around the real Fernet to set is_encrypted to True.
+
+    This class is only used internally to avoid changing the interface of
+    the get_fernet function.
+    """
+
+    from cryptography.fernet import Fernet, MultiFernet
+
+    is_encrypted = True
+
+    def __init__(self, fernet: MultiFernet):
+        self._fernet = fernet
+
+    def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes:
+        """Decrypt with Fernet."""
+        return self._fernet.decrypt(msg, ttl)
+
+    def encrypt(self, msg: bytes) -> bytes:
+        """Encrypt with Fernet."""
+        return self._fernet.encrypt(msg)
+
     def rotate(self, msg: bytes | str) -> bytes:
         """Rotate the Fernet key for the given message."""
-        ...
+        return self._fernet.rotate(msg)
 
 
 @cache
@@ -57,7 +107,12 @@ def get_fernet() -> FernetProtocol:
     from cryptography.fernet import Fernet, MultiFernet
 
     try:
-        fernet_key = conf.get_mandatory_value("core", "FERNET_KEY")
-        return MultiFernet([Fernet(fernet_part.encode("utf-8")) for 
fernet_part in fernet_key.split(",")])
+        fernet_key = conf.get("core", "FERNET_KEY")
+        if not fernet_key:
+            log.warning("empty cryptography key - values will not be stored 
encrypted.")
+            return _NullFernet()
+
+        fernet = MultiFernet([Fernet(fernet_part.encode("utf-8")) for 
fernet_part in fernet_key.split(",")])
+        return _RealFernet(fernet)
     except (ValueError, TypeError) as value_error:
         raise AirflowException(f"Could not create Fernet object: 
{value_error}")
diff --git a/airflow-core/src/airflow/models/variable.py 
b/airflow-core/src/airflow/models/variable.py
index c5037561cbc..a13eb4fe158 100644
--- a/airflow-core/src/airflow/models/variable.py
+++ b/airflow-core/src/airflow/models/variable.py
@@ -97,7 +97,7 @@ class Variable(Base, LoggingMixin):
         if value is not None:
             fernet = get_fernet()
             self._val = fernet.encrypt(bytes(value, "utf-8")).decode()
-            self.is_encrypted = True
+            self.is_encrypted = fernet.is_encrypted
 
     @declared_attr
     def val(cls):
diff --git a/airflow-core/tests/unit/always/test_connection.py 
b/airflow-core/tests/unit/always/test_connection.py
index 3f3b3af4bfa..d613c0d77da 100644
--- a/airflow-core/tests/unit/always/test_connection.py
+++ b/airflow-core/tests/unit/always/test_connection.py
@@ -109,6 +109,18 @@ class TestConnection:
     def teardown_method(self):
         self.patcher.stop()
 
+    @conf_vars({("core", "fernet_key"): ""})
+    def test_connection_extra_no_encryption(self):
+        """
+        Tests extras on a new connection without encryption. The fernet key
+        is set to a non-base64-encoded string and the extra is stored without
+        encryption.
+        """
+        crypto.get_fernet.cache_clear()
+        test_connection = Connection(extra='{"apache": "airflow"}')
+        assert not test_connection.is_extra_encrypted
+        assert test_connection.extra == '{"apache": "airflow"}'
+
     @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
     def test_connection_extra_with_encryption(self):
         """
diff --git 
a/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py 
b/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
index b2638208fa3..dbd250c2c2d 100644
--- a/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_rotate_fernet_key_command.py
@@ -49,8 +49,14 @@ class TestRotateFernetKeyCommand:
     def test_should_rotate_variable(self, session):
         fernet_key1 = Fernet.generate_key()
         fernet_key2 = Fernet.generate_key()
+        var1_key = f"{__file__}_var1"
         var2_key = f"{__file__}_var2"
 
+        # Create unencrypted variable
+        with conf_vars({("core", "fernet_key"): ""}):
+            get_fernet.cache_clear()  # Clear cached fernet
+            Variable.set(key=var1_key, value="value")
+
         # Create encrypted variable
         with conf_vars({("core", "fernet_key"): fernet_key1.decode()}):
             get_fernet.cache_clear()  # Clear cached fernet
@@ -65,14 +71,25 @@ class TestRotateFernetKeyCommand:
         # Assert correctness using a new fernet key
         with conf_vars({("core", "fernet_key"): fernet_key2.decode()}):
             get_fernet.cache_clear()  # Clear cached fernet
+            var1 = session.query(Variable).filter(Variable.key == 
var1_key).first()
+            # Unencrypted variable should be unchanged
+            assert Variable.get(key=var1_key) == "value"
+            assert var1._val == "value"
             assert Variable.get(key=var2_key) == "value"
 
     @provide_session
     def test_should_rotate_connection(self, session, mock_supervisor_comms):
         fernet_key1 = Fernet.generate_key()
         fernet_key2 = Fernet.generate_key()
+        var1_key = f"{__file__}_var1"
         var2_key = f"{__file__}_var2"
 
+        # Create unencrypted variable
+        with conf_vars({("core", "fernet_key"): ""}):
+            get_fernet.cache_clear()  # Clear cached fernet
+            session.add(Connection(conn_id=var1_key, 
uri="mysql://user:pass@localhost"))
+            session.commit()
+
         # Create encrypted variable
         with conf_vars({("core", "fernet_key"): fernet_key1.decode()}):
             get_fernet.cache_clear()  # Clear cached fernet
@@ -102,10 +119,17 @@ class TestRotateFernetKeyCommand:
                 )
             raise Exception(f"Connection {conn_id} not found")
 
+        # Mock the send method to return our connection data
+        mock_supervisor_comms.send.return_value = mock_get_connection(var1_key)
+
         # Assert correctness using a new fernet key
         with conf_vars({("core", "fernet_key"): fernet_key2.decode()}):
             get_fernet.cache_clear()  # Clear cached fernet
 
+            # Unencrypted variable should be unchanged
+            conn1: Connection = BaseHook.get_connection(var1_key)
+            assert conn1.password == "pass"
+
             # Mock for the second connection
             mock_supervisor_comms.send.return_value = 
mock_get_connection(var2_key)
             assert BaseHook.get_connection(var2_key).password == "pass"
diff --git a/airflow-core/tests/unit/models/test_variable.py 
b/airflow-core/tests/unit/models/test_variable.py
index 6ae364be503..b02a760a665 100644
--- a/airflow-core/tests/unit/models/test_variable.py
+++ b/airflow-core/tests/unit/models/test_variable.py
@@ -52,6 +52,20 @@ class TestVariable:
             yield
         db.clear_db_variables()
 
+    @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): 
"True"})
+    def test_variable_no_encryption(self, session):
+        """
+        Test variables without encryption
+        """
+        crypto.get_fernet.cache_clear()
+        Variable.set(key="key", value="value", session=session)
+        test_var = session.query(Variable).filter(Variable.key == "key").one()
+        assert not test_var.is_encrypted
+        assert test_var.val == "value"
+        # We always call mask_secret for variables, and let the SecretsMasker 
decide based on the name if it
+        # should mask anything. That logic is tested in test_secrets_masker.py
+        self.mask_secret.assert_called_once_with("value", "key")
+
     @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
     def test_variable_with_encryption(self, session):
         """

Reply via email to