kacpermuda commented on code in PR #66342:
URL: https://github.com/apache/airflow/pull/66342#discussion_r3242976451
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -103,25 +108,52 @@ def get_or_create_openlineage_client(self) ->
OpenLineageClient:
return self._client
def get_openlineage_config(self) -> dict | None:
- # First, try to read from YAML file
+ # First, try to read from Airflow connection
+ openlineage_config_conn_id = conf.config_conn_id()
+ if openlineage_config_conn_id:
+ config =
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+ self._resolve_airflow_connection_auth(config=config,
config_conn_id=openlineage_config_conn_id)
+ return config
+ self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+ # Second, try to read from YAML file
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
- config = self._read_yaml_config(openlineage_config_path)
- return config
+ yaml_config = self._read_yaml_config(openlineage_config_path)
+ if yaml_config is None:
+ return None
+ self._resolve_airflow_connection_auth(yaml_config)
+ return yaml_config
self.log.debug("OpenLineage config_path configuration not found.")
- # Second, try to get transport config
+ # Third, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
- return {"transport": transport_config}
+ config = {"transport": transport_config}
+ self._resolve_airflow_connection_auth(config)
+ return config
@staticmethod
- def _read_yaml_config(path: str) -> dict | None:
+ def _read_yaml_config(path: str) -> dict[str, Any] | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
+ @staticmethod
+ def _resolve_airflow_connection_auth(
+ config: dict[str, Any] | None, config_conn_id: str | None = None
+ ) -> None:
+ if not isinstance(config, dict):
+ return
+
+ for key, value in config.items():
+ if isinstance(value, dict) and value.get("type") ==
AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE:
+ provider = AirflowConnectionTokenProvider(value,
default_conn_id=config_conn_id)
+ config[key] = {"type": "api_key", "apiKey":
provider.get_api_key()}
+ else:
+ OpenLineageAdapter._resolve_airflow_connection_auth(value,
config_conn_id=config_conn_id)
Review Comment:
Yup, composite transports can also be nested, so this is a good call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]