Copilot commented on code in PR #64767:
URL: https://github.com/apache/airflow/pull/64767#discussion_r3066476663
##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str,
Any]:
try:
if celery_ssl_active:
+ ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS",
fallback=True)
+ ssl_key = team_conf.get("celery", "SSL_KEY")
+ ssl_cert = team_conf.get("celery", "SSL_CERT")
+ ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+ if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+ raise AirflowException(
+ "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or
SSL_CERT are not set. "
+ "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for
one-way TLS."
+ )
Review Comment:
The new explicit `AirflowException` raised for missing `SSL_KEY`/`SSL_CERT`
will be caught by the broad `except Exception as e` and re-wrapped as an
\"unknown Celery SSL Error\", which defeats the purpose of the clearer error
and will likely break the new test that matches the specific message. Add an
`except AirflowException: raise` before the generic handler, or make the
generic handler re-raise `AirflowException` unchanged.
##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str,
Any]:
try:
if celery_ssl_active:
+ ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS",
fallback=True)
+ ssl_key = team_conf.get("celery", "SSL_KEY")
+ ssl_cert = team_conf.get("celery", "SSL_CERT")
+ ssl_cacert = team_conf.get("celery", "SSL_CACERT")
+
+ if ssl_mutual_tls and (not ssl_key or not ssl_cert):
+ raise AirflowException(
+ "SSL_MUTUAL_TLS is True (default) but SSL_KEY and/or
SSL_CERT are not set. "
+ "Set both for mutual TLS, or set SSL_MUTUAL_TLS=False for
one-way TLS."
+ )
+
+ if not ssl_mutual_tls and (ssl_key or ssl_cert):
+ log.warning(
+ "SSL_MUTUAL_TLS is False but SSL_KEY/SSL_CERT are
configured. "
+ "Client certificates will not be used. "
+ "Set SSL_MUTUAL_TLS=True if you intend to use mutual TLS."
+ )
+
if broker_url and re.search(r"amqps?://", broker_url):
- broker_use_ssl = {
- "keyfile": team_conf.get("celery", "SSL_KEY"),
- "certfile": team_conf.get("celery", "SSL_CERT"),
- "ca_certs": team_conf.get("celery", "SSL_CACERT"),
- "cert_reqs": ssl.CERT_REQUIRED,
- }
+ if ssl_mutual_tls:
+ broker_use_ssl = {
+ "keyfile": ssl_key,
+ "certfile": ssl_cert,
+ "ca_certs": ssl_cacert,
+ "cert_reqs": ssl.CERT_REQUIRED,
+ }
+ else:
+ broker_use_ssl = {
+ "ca_certs": ssl_cacert,
+ "cert_reqs": ssl.CERT_REQUIRED,
+ }
elif broker_url and re.search("rediss?://|sentinel://",
broker_url):
- broker_use_ssl = {
- "ssl_keyfile": team_conf.get("celery", "SSL_KEY"),
- "ssl_certfile": team_conf.get("celery", "SSL_CERT"),
- "ssl_ca_certs": team_conf.get("celery", "SSL_CACERT"),
- "ssl_cert_reqs": ssl.CERT_REQUIRED,
- }
+ if ssl_mutual_tls:
+ broker_use_ssl = {
+ "ssl_keyfile": ssl_key,
+ "ssl_certfile": ssl_cert,
+ "ssl_ca_certs": ssl_cacert,
+ "ssl_cert_reqs": ssl.CERT_REQUIRED,
+ }
+ else:
+ broker_use_ssl = {
+ "ssl_ca_certs": ssl_cacert,
+ "ssl_cert_reqs": ssl.CERT_REQUIRED,
+ }
else:
raise AirflowException(
"The broker you configured does not support SSL_ACTIVE to
be True. "
"Please use RabbitMQ or Redis if you would like to use SSL
for broker."
)
config["broker_use_ssl"] = broker_use_ssl
Review Comment:
The new explicit `AirflowException` raised for missing `SSL_KEY`/`SSL_CERT`
will be caught by the broad `except Exception as e` and re-wrapped as an
\"unknown Celery SSL Error\", which defeats the purpose of the clearer error
and will likely break the new test that matches the specific message. Add an
`except AirflowException: raise` before the generic handler, or make the
generic handler re-raise `AirflowException` unchanged.
```suggestion
config["broker_use_ssl"] = broker_use_ssl
except AirflowException:
raise
```
##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -141,31 +141,57 @@ def get_default_celery_config(team_conf) -> dict[str,
Any]:
try:
if celery_ssl_active:
+ ssl_mutual_tls = team_conf.getboolean("celery", "SSL_MUTUAL_TLS",
fallback=True)
+ ssl_key = team_conf.get("celery", "SSL_KEY")
+ ssl_cert = team_conf.get("celery", "SSL_CERT")
+ ssl_cacert = team_conf.get("celery", "SSL_CACERT")
Review Comment:
`SSL_CACERT` is central to the stated goal (\"server verification only via
SSL_CACERT\"), but it is never validated. If it is empty/missing while
`SSL_ACTIVE=True`, the connection may fail later with less actionable SSL
errors, or it may fall back to system CA behavior unexpectedly. Consider
explicitly requiring a non-empty `SSL_CACERT` whenever `SSL_ACTIVE=True` (both
mutual and one-way) and raising a targeted `AirflowException` that tells users
to set `SSL_CACERT` (or, if you intend to allow system CAs, document that and
adjust `cert_reqs`/options accordingly).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]