jason810496 commented on code in PR #48547:
URL: https://github.com/apache/airflow/pull/48547#discussion_r2037703181


##########
airflow-core/src/airflow/models/variable.py:
##########
@@ -230,8 +230,61 @@ def set(
         else:
             stored_value = str(value)
 
-        Variable.delete(key, session=session)
-        session.add(Variable(key=key, val=stored_value, 
description=description))
+        new_variable = Variable(key=key, val=stored_value, 
description=description)
+

Review Comment:
   ```suggestion
           # Map of dialect names to their corresponding module paths
           dialect_insert_map = {
               "postgresql": "sqlalchemy.dialects.postgresql.insert",
               "mysql": "sqlalchemy.dialects.mysql.insert",
               "sqlite": "sqlalchemy.dialects.sqlite.insert",
           }
   ```



##########
airflow-core/src/airflow/models/variable.py:
##########
@@ -230,8 +230,61 @@ def set(
         else:
             stored_value = str(value)
 
-        Variable.delete(key, session=session)
-        session.add(Variable(key=key, val=stored_value, 
description=description))
+        new_variable = Variable(key=key, val=stored_value, 
description=description)
+
+        # Perform dialect-specific upsert operation
+        dialect_name = session.get_bind().dialect.name
+
+        # Use SQLAlchemy Core for supported dialects
+        if dialect_name in ("postgresql", "mysql", "sqlite"):

Review Comment:
   ```suggestion
           if dialect_name in dialect_insert_map:
               # Dynamically import the insert function
               insert = import_string(dialect_insert_map[dialect_name])
   ```
   
   Note: `import_string` is a airflow utils from `airflow.utils.module_loading`



##########
airflow-core/src/airflow/models/variable.py:
##########
@@ -230,8 +230,61 @@ def set(
         else:
             stored_value = str(value)
 
-        Variable.delete(key, session=session)
-        session.add(Variable(key=key, val=stored_value, 
description=description))
+        new_variable = Variable(key=key, val=stored_value, 
description=description)
+
+        # Perform dialect-specific upsert operation
+        dialect_name = session.get_bind().dialect.name
+
+        # Use SQLAlchemy Core for supported dialects
+        if dialect_name in ("postgresql", "mysql", "sqlite"):
+            val = new_variable._val
+            is_encrypted = new_variable.is_encrypted
+
+            # Import dialect-specific insert function
+            if dialect_name == "postgresql":
+                from sqlalchemy.dialects.postgresql import insert
+            elif dialect_name == "mysql":
+                from sqlalchemy.dialects.mysql import insert
+            elif dialect_name == "sqlite":
+                from sqlalchemy.dialects.sqlite import insert
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to