jedcunningham commented on code in PR #39246:
URL: https://github.com/apache/airflow/pull/39246#discussion_r1578894900


##########
airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py:
##########
@@ -38,13 +41,43 @@
 airflow_version = "2.9.0"
 
 
+def get_session() -> sa.orm.Session:
+    conn = op.get_bind()
+    sessionmaker = sa.orm.sessionmaker()
+    return sessionmaker(bind=conn)
+
 def upgrade():
-    """Update trigger kwargs type to string"""
+    """Update trigger kwargs type to string and encrypt"""
     with op.batch_alter_table("trigger") as batch_op:
         batch_op.alter_column("kwargs", type_=sa.Text(), )
 
+    if not context.is_offline_mode():
+        session = get_session()
+        try:
+            for trigger in session.query(Trigger):
+                trigger.kwargs = trigger.kwargs
+            session.commit()
+        finally:
+            session.close()
+
 
 def downgrade():
-    """Unapply update trigger kwargs type to string"""
+    """Unapply update trigger kwargs type to string and encrypt"""
+    if context.is_offline_mode():
+        print(dedent("""
+        ------------
+        --  WARNING: Unable to decrypt trigger kwargs automatically in offline 
mode!

Review Comment:
   I don't think we can do any better than spitting out a warning :(.
   
   But if folks have ideas here, happy to hear them.



##########
airflow/models/trigger.py:
##########
@@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, 
Any]:
         from airflow.models.crypto import get_fernet
         from airflow.serialization.serialized_objects import BaseSerialization
 
-        decrypted_kwargs = 
json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
+        # We weren't able to encrypt the kwargs in all migration paths,
+        # so we need to handle the case where they are not encrypted.
+        # Triggers aren't long lasting, so we can skip encrypting them now.

Review Comment:
   We could start doing this if we'd like to, but I'd suggest we do it in a 
follow up PR.



##########
tests/models/test_trigger.py:
##########
@@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs():
     assert isinstance(trigger_row.encrypted_kwargs, str)
     assert "value1" not in trigger_row.encrypted_kwargs
     assert "value2" not in trigger_row.encrypted_kwargs
+
+
+def test_kwargs_not_encrypted():
+    """
+    Tests that we don't decrypt kwargs if they aren't encrypted.
+    We weren't able to encrypt the kwargs in all migration paths.
+    """
+    trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", 
kwargs={})
+    # force the `encrypted_kwargs` to be unencrypted, like they would be after 
an upgrade
+    trigger.encrypted_kwargs = json.dumps(
+        BaseSerialization.serialize({"param1": "value1", "param2": "value2"})
+    )
+    print(trigger.encrypted_kwargs)

Review Comment:
   ```suggestion
   ```
   
   Nothing to see here 🤦



##########
airflow/models/trigger.py:
##########
@@ -116,7 +116,15 @@ def _decrypt_kwargs(encrypted_kwargs: str) -> dict[str, 
Any]:
         from airflow.models.crypto import get_fernet
         from airflow.serialization.serialized_objects import BaseSerialization
 
-        decrypted_kwargs = 
json.loads(get_fernet().decrypt(encrypted_kwargs.encode("utf-8")).decode("utf-8"))
+        # We weren't able to encrypt the kwargs in all migration paths,
+        # so we need to handle the case where they are not encrypted.
+        # Triggers aren't long lasting, so we can skip encrypting them now.
+        if encrypted_kwargs.startswith("{"):
+            decrypted_kwargs = json.loads(encrypted_kwargs)

Review Comment:
   This fixes the offline upgrade path by detecting the kwargs aren't actually 
encrypted.



##########
tests/models/test_trigger.py:
##########
@@ -378,3 +380,19 @@ def test_serialize_sensitive_kwargs():
     assert isinstance(trigger_row.encrypted_kwargs, str)
     assert "value1" not in trigger_row.encrypted_kwargs
     assert "value2" not in trigger_row.encrypted_kwargs
+
+
+def test_kwargs_not_encrypted():
+    """
+    Tests that we don't decrypt kwargs if they aren't encrypted.
+    We weren't able to encrypt the kwargs in all migration paths.
+    """
+    trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", 
kwargs={})
+    # force the `encrypted_kwargs` to be unencrypted, like they would be after 
an upgrade

Review Comment:
   ```suggestion
       # force the `encrypted_kwargs` to be unencrypted, like they would be 
after an offline upgrade
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to