mobuchowski commented on code in PR #66342:
URL: https://github.com/apache/airflow/pull/66342#discussion_r3242630804
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -103,25 +108,52 @@ def get_or_create_openlineage_client(self) ->
OpenLineageClient:
return self._client
def get_openlineage_config(self) -> dict | None:
- # First, try to read from YAML file
+ # First, try to read from Airflow connection
+ openlineage_config_conn_id = conf.config_conn_id()
+ if openlineage_config_conn_id:
+ config =
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+ self._resolve_airflow_connection_auth(config=config,
config_conn_id=openlineage_config_conn_id)
+ return config
+ self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+ # Second, try to read from YAML file
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
- config = self._read_yaml_config(openlineage_config_path)
- return config
+ yaml_config = self._read_yaml_config(openlineage_config_path)
+ if yaml_config is None:
+ return None
+ self._resolve_airflow_connection_auth(yaml_config)
+ return yaml_config
self.log.debug("OpenLineage config_path configuration not found.")
- # Second, try to get transport config
+ # Third, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
- return {"transport": transport_config}
+ config = {"transport": transport_config}
+ self._resolve_airflow_connection_auth(config)
+ return config
@staticmethod
- def _read_yaml_config(path: str) -> dict | None:
+ def _read_yaml_config(path: str) -> dict[str, Any] | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
+ @staticmethod
+ def _resolve_airflow_connection_auth(
+ config: dict[str, Any] | None, config_conn_id: str | None = None
+ ) -> None:
+ if not isinstance(config, dict):
+ return
+
+ for key, value in config.items():
+ if isinstance(value, dict) and value.get("type") ==
AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE:
+ provider = AirflowConnectionTokenProvider(value,
default_conn_id=config_conn_id)
+ config[key] = {"type": "api_key", "apiKey":
provider.get_api_key()}
+ else:
+ OpenLineageAdapter._resolve_airflow_connection_auth(value,
config_conn_id=config_conn_id)
Review Comment:
I think this won't work with composite transport, for example:
```
{"type": "composite", "transports": [{"auth": {"type":
"airflow_connection_api_key", ...}}]})
```
Can we make it work with it, something like
```python
@staticmethod
def _resolve_airflow_connection_auth(
config: dict[str, Any] | None, config_conn_id: str | None = None
) -> None:
if not isinstance(config, dict):
return
for key, value in config.items():
if isinstance(value, dict) and key == "auth" and value.get("type")
== AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE:
provider = AirflowConnectionTokenProvider(value,
default_conn_id=config_conn_id)
config[key] = {"type": "api_key", "apiKey":
provider.get_api_key()}
elif key == "transports" and isinstance(value, list):
for item in value:
OpenLineageAdapter._resolve_airflow_connection_auth(item,
config_conn_id=config_conn_id)
else:
OpenLineageAdapter._resolve_airflow_connection_auth(value,
config_conn_id=config_conn_id)
```
##########
providers/openlineage/src/airflow/providers/openlineage/conf.py:
##########
@@ -54,6 +54,12 @@ def config_path(check_legacy_env_var: bool = True) -> str:
return option
+@cache
+def config_conn_id() -> str:
+ """[openlineage] config_conn_id."""
+ return conf.get(_CONFIG_SECTION, "config_conn_id", fallback="")
Review Comment:
We need to add new configuration options to `provider.yaml` too - so the
docs are properly generated.
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -103,25 +108,52 @@ def get_or_create_openlineage_client(self) ->
OpenLineageClient:
return self._client
def get_openlineage_config(self) -> dict | None:
- # First, try to read from YAML file
+ # First, try to read from Airflow connection
+ openlineage_config_conn_id = conf.config_conn_id()
+ if openlineage_config_conn_id:
+ config =
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+ self._resolve_airflow_connection_auth(config=config,
config_conn_id=openlineage_config_conn_id)
+ return config
+ self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+ # Second, try to read from YAML file
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
- config = self._read_yaml_config(openlineage_config_path)
- return config
+ yaml_config = self._read_yaml_config(openlineage_config_path)
+ if yaml_config is None:
+ return None
+ self._resolve_airflow_connection_auth(yaml_config)
+ return yaml_config
self.log.debug("OpenLineage config_path configuration not found.")
- # Second, try to get transport config
+ # Third, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
- return {"transport": transport_config}
+ config = {"transport": transport_config}
+ self._resolve_airflow_connection_auth(config)
+ return config
@staticmethod
- def _read_yaml_config(path: str) -> dict | None:
+ def _read_yaml_config(path: str) -> dict[str, Any] | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
+ @staticmethod
+ def _resolve_airflow_connection_auth(
+ config: dict[str, Any] | None, config_conn_id: str | None = None
+ ) -> None:
+ if not isinstance(config, dict):
+ return
+
+ for key, value in config.items():
+ if isinstance(value, dict) and value.get("type") ==
AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE:
Review Comment:
```suggestion
if isinstance(value, dict) and and key == "auth" and
value.get("type") == AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]