kacpermuda commented on code in PR #66342:
URL: https://github.com/apache/airflow/pull/66342#discussion_r3242982959


##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -103,25 +108,52 @@ def get_or_create_openlineage_client(self) -> 
OpenLineageClient:
         return self._client
 
     def get_openlineage_config(self) -> dict | None:
-        # First, try to read from YAML file
+        # First, try to read from Airflow connection
+        openlineage_config_conn_id = conf.config_conn_id()
+        if openlineage_config_conn_id:
+            config = 
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+            self._resolve_airflow_connection_auth(config=config, 
config_conn_id=openlineage_config_conn_id)
+            return config
+        self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+        # Second, try to read from YAML file
         openlineage_config_path = conf.config_path(check_legacy_env_var=False)
         if openlineage_config_path:
-            config = self._read_yaml_config(openlineage_config_path)
-            return config
+            yaml_config = self._read_yaml_config(openlineage_config_path)
+            if yaml_config is None:
+                return None
+            self._resolve_airflow_connection_auth(yaml_config)

Review Comment:
   Should every resolve be guarded and actually fire only if there is an 
airflow connection defined? I think it fires every time now. We need to make 
sure that for users not setting up this conn, nothing changes.



##########
providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py:
##########
@@ -103,25 +108,52 @@ def get_or_create_openlineage_client(self) -> 
OpenLineageClient:
         return self._client
 
     def get_openlineage_config(self) -> dict | None:
-        # First, try to read from YAML file
+        # First, try to read from Airflow connection
+        openlineage_config_conn_id = conf.config_conn_id()
+        if openlineage_config_conn_id:
+            config = 
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+            self._resolve_airflow_connection_auth(config=config, 
config_conn_id=openlineage_config_conn_id)
+            return config
+        self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+        # Second, try to read from YAML file
         openlineage_config_path = conf.config_path(check_legacy_env_var=False)
         if openlineage_config_path:
-            config = self._read_yaml_config(openlineage_config_path)
-            return config
+            yaml_config = self._read_yaml_config(openlineage_config_path)
+            if yaml_config is None:
+                return None
+            self._resolve_airflow_connection_auth(yaml_config)
+            return yaml_config
         self.log.debug("OpenLineage config_path configuration not found.")
 
-        # Second, try to get transport config
+        # Third, try to get transport config
         transport_config = conf.transport()
         if not transport_config:
             self.log.debug("OpenLineage transport configuration not found.")
             return None
-        return {"transport": transport_config}
+        config = {"transport": transport_config}
+        self._resolve_airflow_connection_auth(config)
+        return config
 
     @staticmethod
-    def _read_yaml_config(path: str) -> dict | None:
+    def _read_yaml_config(path: str) -> dict[str, Any] | None:
         with open(path) as config_file:
             return yaml.safe_load(config_file)
 
+    @staticmethod
+    def _resolve_airflow_connection_auth(

Review Comment:
   Maybe this can live on some class in the new airflow token provider module? 
or even as standalone function there? It's very specific, maybe not needed on 
adapter ?



##########
providers/openlineage/src/airflow/providers/openlineage/token_provider.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+OPENLINEAGE_CONFIG_EXTRA_KEY = "openlineage_config"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""

Review Comment:
   What will be the connection type we're going to use? Maybe we should create 
an openlineage connection type, with some nice template for users to fill, so 
it's easier to setup OL via airflow UI? I feel like this would be a nice 
feature. We don't need to make it pretty or complex now in v1, but if we decide 
to go with a new connection type, now is the time to do it and I think it may 
be a good idea. @mobuchowski wdyt?



##########
providers/openlineage/src/airflow/providers/openlineage/token_provider.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+OPENLINEAGE_CONFIG_EXTRA_KEY = "openlineage_config"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""
+
+
+class AirflowConnectionConfigProvider:
+    """
+    Resolve OpenLineage client configuration from an Airflow connection.
+
+    The connection extra can contain the full OpenLineage client config, for 
example
+    ``{"transport": {"type": "console"}}``. For convenience, it can also 
contain only the transport
+    config, for example ``{"type": "console"}``.
+    """
+
+    def __init__(self, conn_id: str) -> None:
+        if not conn_id:
+            raise OpenLineageAirflowConnectionConfigError(
+                "OpenLineage connection config requires a non-empty connection 
ID."
+            )
+        self.conn_id = conn_id
+
+    def get_config(self) -> dict[str, Any]:
+        connection = BaseHook.get_connection(self.conn_id)
+        extra = connection.extra_dejson
+        config = self._get_config_from_extra(extra)
+        if config is not None:
+            return config
+
+        raise OpenLineageAirflowConnectionConfigError(
+            "OpenLineage connection config could not find configuration in 
connection "
+            f"`{self.conn_id}`. Expected full OpenLineage config or transport 
config in connection extra."
+        )
+
+    def _get_config_from_extra(self, extra: dict[str, Any]) -> dict[str, Any] 
| None:
+        if OPENLINEAGE_CONFIG_EXTRA_KEY in extra:
+            return self._validate_config(extra[OPENLINEAGE_CONFIG_EXTRA_KEY])
+
+        if "transport" in extra:
+            return self._validate_config(extra)
+
+        if "type" in extra:
+            return {"transport": extra}
+
+        return None
+
+    def _validate_config(self, config: Any) -> dict[str, Any]:

Review Comment:
   Hmm, wondering if there is any way we can use OL client initialization as 
validation here to avoid duplicate check logic. If not it's fine, we'll extend 
this validation in the future.



##########
providers/openlineage/src/airflow/providers/openlineage/token_provider.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+OPENLINEAGE_CONFIG_EXTRA_KEY = "openlineage_config"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""

Review Comment:
   Not sure if just looking for `openlineage_config` in any conn will not be 
confusing to end users, the first thing you need to choose when defining 
airflow connection is it's type, so we should probably not be saying "choose 
whatever" when we can say "choose openlineage type"



##########
providers/openlineage/src/airflow/providers/openlineage/token_provider.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+OPENLINEAGE_CONFIG_EXTRA_KEY = "openlineage_config"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""
+
+
+class AirflowConnectionConfigProvider:
+    """
+    Resolve OpenLineage client configuration from an Airflow connection.
+
+    The connection extra can contain the full OpenLineage client config, for 
example
+    ``{"transport": {"type": "console"}}``. For convenience, it can also 
contain only the transport
+    config, for example ``{"type": "console"}``.
+    """
+
+    def __init__(self, conn_id: str) -> None:
+        if not conn_id:
+            raise OpenLineageAirflowConnectionConfigError(
+                "OpenLineage connection config requires a non-empty connection 
ID."
+            )
+        self.conn_id = conn_id
+
+    def get_config(self) -> dict[str, Any]:
+        connection = BaseHook.get_connection(self.conn_id)
+        extra = connection.extra_dejson
+        config = self._get_config_from_extra(extra)
+        if config is not None:
+            return config
+
+        raise OpenLineageAirflowConnectionConfigError(
+            "OpenLineage connection config could not find configuration in 
connection "
+            f"`{self.conn_id}`. Expected full OpenLineage config or transport 
config in connection extra."
+        )
+
+    def _get_config_from_extra(self, extra: dict[str, Any]) -> dict[str, Any] 
| None:
+        if OPENLINEAGE_CONFIG_EXTRA_KEY in extra:
+            return self._validate_config(extra[OPENLINEAGE_CONFIG_EXTRA_KEY])
+
+        if "transport" in extra:
+            return self._validate_config(extra)
+
+        if "type" in extra:
+            return {"transport": extra}
+
+        return None

Review Comment:
   I think this can be simplified, if we go with OpenLineage connection type, 
we just assume its extra is always the entire config, so we look for transport, 
as always.



##########
providers/openlineage/src/airflow/providers/openlineage/token_provider.py:
##########
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+OPENLINEAGE_CONFIG_EXTRA_KEY = "openlineage_config"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+    """Raised when OpenLineage API key auth cannot be resolved from an Airflow 
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+    """Raised when OpenLineage config cannot be resolved from an Airflow 
connection."""
+
+
+class AirflowConnectionConfigProvider:
+    """
+    Resolve OpenLineage client configuration from an Airflow connection.
+
+    The connection extra can contain the full OpenLineage client config, for 
example
+    ``{"transport": {"type": "console"}}``. For convenience, it can also 
contain only the transport
+    config, for example ``{"type": "console"}``.
+    """
+
+    def __init__(self, conn_id: str) -> None:
+        if not conn_id:
+            raise OpenLineageAirflowConnectionConfigError(
+                "OpenLineage connection config requires a non-empty connection 
ID."
+            )
+        self.conn_id = conn_id
+
+    def get_config(self) -> dict[str, Any]:
+        connection = BaseHook.get_connection(self.conn_id)
+        extra = connection.extra_dejson
+        config = self._get_config_from_extra(extra)
+        if config is not None:
+            return config
+
+        raise OpenLineageAirflowConnectionConfigError(
+            "OpenLineage connection config could not find configuration in 
connection "
+            f"`{self.conn_id}`. Expected full OpenLineage config or transport 
config in connection extra."
+        )
+
+    def _get_config_from_extra(self, extra: dict[str, Any]) -> dict[str, Any] 
| None:
+        if OPENLINEAGE_CONFIG_EXTRA_KEY in extra:
+            return self._validate_config(extra[OPENLINEAGE_CONFIG_EXTRA_KEY])
+
+        if "transport" in extra:
+            return self._validate_config(extra)
+
+        if "type" in extra:
+            return {"transport": extra}

Review Comment:
   no validation here :) ? might not be relevant after other comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to