ksh24865 commented on PR #48547:
URL: https://github.com/apache/airflow/pull/48547#issuecomment-2769506852
Additionally, it's technically possible to check the session's dialect first
and then branch accordingly to use `INSERT ... ON CONFLICT DO UPDATE (or the
equivalent)` with SQLAlchemy Core, depending on the specific database.
However, the Variable.val column is actually encrypted and decrypted through
set_val and get_val, respectively. Because of this, it's difficult to use
SQLAlchemy Core for this logic, as it would bypass the ORM-level hooks involved
in encryption and decryption.
Reference:
```
def get_val(self):
"""Get Airflow Variable from Metadata DB and decode it using the
Fernet Key."""
from cryptography.fernet import InvalidToken as InvalidFernetToken
if self._val is not None and self.is_encrypted:
try:
fernet = get_fernet()
return fernet.decrypt(bytes(self._val, "utf-8")).decode()
except InvalidFernetToken:
self.log.error("Can't decrypt _val for key=%s, invalid token
or value", self.key)
return None
except Exception:
self.log.error("Can't decrypt _val for key=%s, FERNET_KEY
configuration missing", self.key)
return None
else:
return self._val
def set_val(self, value):
"""Encode the specified value with Fernet Key and store it in
Variables Table."""
if value is not None:
fernet = get_fernet()
self._val = fernet.encrypt(bytes(value, "utf-8")).decode()
self.is_encrypted = fernet.is_encrypted
@declared_attr
def val(cls):
"""Get Airflow Variable from Metadata DB and decode it using the
Fernet Key."""
return synonym("_val", descriptor=property(cls.get_val, cls.set_val))
```
I’d appreciate your thoughts on this. Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]