Copilot commented on code in PR #57610:
URL: https://github.com/apache/airflow/pull/57610#discussion_r2761728647


##########
providers/informatica/provider.yaml:
##########
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-informatica
+name: Informatica Airflow
+description: |
+  `Informatica <https://www.informatica.com//>`__
+
+state: ready
+source-date-epoch: 1758787152
+# Note that those versions are maintained by release manager - do not update 
them manually
+# with the exception of case where other provider in sources has >= new 
provider version.
+# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider 
have
+# to be done in the same PR
+versions:
+  - 0.1.0
+
+integrations:
+  - integration-name: Informatica
+    external-doc-url: https://www.informatica.com/
+    logo: /docs/integration-logos/informatica.png
+    tags: [protocol]
+
+hooks:
+  - integration-name: Informatica
+    python-modules:
+      - airflow.providers.informatica.hooks.edc
+
+connection-types:
+  - hook-class-name: airflow.providers.informatica.hooks.edc.InformaticaEDCHook
+    connection-type: informatica_edc
+
+plugins:
+  - name: informatica
+    plugin-class: 
airflow.providers.informatica.plugins.InformaticaProviderPlugin
+

Review Comment:
   PR description mentions adding an operator, but this provider metadata 
currently only declares hooks and plugins (no operators are listed here, and 
there is no `operators` module under `providers/informatica/src`). Either add 
the operator(s) and list them under `operators:` or update the PR description 
to match the actual scope.



##########
providers/informatica/src/airflow/providers/informatica/plugins/informatica.py:
##########
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.configuration import conf
+from airflow.plugins_manager import AirflowPlugin
+
+is_disabled = conf.getboolean("informatica", "disabled", fallback=False)
+# Conditional imports - only load expensive dependencies when plugin is enabled
+if not is_disabled:
+    from airflow.providers.common.compat.sdk import HookLineageReader
+    from airflow.providers.informatica.plugins.listener import 
get_informatica_listener
+
+
+class InformaticaProviderPlugin(AirflowPlugin):
+    """
+    Listener that emits numerous Events.
+
+    Informatica Plugin provides listener that emits OL events on DAG start,
+    complete and failure and TaskInstances start, complete and failure.
+    """

Review Comment:
   This plugin docstring claims it emits events on DAG start/complete/failure, 
but the plugin only registers a task-instance listener. Please adjust the 
docstring (or add DAG-run event handling) so user-facing docs don’t overpromise 
functionality.



##########
airflow-core/tests/unit/plugins/test_plugins_manager.py:
##########
@@ -359,4 +359,4 @@ def 
test_does_not_double_import_entrypoint_provider_plugins(self):
         # Mock/skip loading from plugin dir
         with 
mock.patch("airflow.plugins_manager._load_plugins_from_plugin_directory", 
return_value=([], [])):
             plugins = plugins_manager._get_plugins()[0]
-        assert len(plugins) == 4
+        assert len(plugins) == 5

Review Comment:
   This test only asserts the total number of loaded plugins, which is brittle 
(changes whenever any provider adds/removes a plugin) and doesn’t directly 
verify the “does not double import” behavior. Consider asserting that the 
entrypoint plugin module appears exactly once in `plugins` (or that provider 
plugins are de-duplicated), instead of hard-coding the count.
   ```suggestion
   
           # Ensure we loaded at least one plugin and that the entrypoint 
plugin is not imported twice.
           assert len(plugins) >= 1
           plugin_module_names = [p.__name__ for p in plugins if hasattr(p, 
"__name__")]
           assert plugin_module_names.count(mock_entrypoint.module) == 1
   ```



##########
providers/informatica/docs/conf.py:
##########
@@ -0,0 +1,27 @@
+# Disable Flake8 because of all the sphinx imports
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Configuration of Providers docs building."""
+
+from __future__ import annotations
+
+import os
+
+os.environ["AIRFLOW_PACKAGE_NAME"] = "apache-airflow-providers-informatica"
+
+from docs.provider_conf import *  # noqa: F403

Review Comment:
   Import pollutes the enclosing namespace, as the imported module 
[docs.provider_conf](1) does not define '__all__'.



##########
providers/informatica/src/airflow/providers/informatica/plugins/listener.py:
##########
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING
+
+from airflow.listeners import hookimpl
+from airflow.providers.informatica.extractors import 
InformaticaLineageExtractor
+from airflow.providers.informatica.hooks.edc import InformaticaEDCHook
+
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+    from airflow.utils.state import TaskInstanceState
+
+_informatica_listener: InformaticaListener | None = None
+
+
+class InformaticaListener:
+    """Informatica listener sends events on task instance and dag run starts, 
completes and failures."""
+

Review Comment:
   The class docstring says the listener handles DAG run start/complete/failure 
events, but this implementation only defines task instance callbacks 
(`on_task_instance_*`). Update the docstring to match current behavior or add 
the missing DAG run hook implementations.



##########
providers/informatica/src/airflow/providers/informatica/hooks/edc.py:
##########
@@ -0,0 +1,245 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import base64
+import re
+from collections.abc import Mapping, MutableMapping
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+from requests.exceptions import RequestException
+
+from airflow.configuration import conf
+from airflow.providers.http.hooks.http import HttpHook
+
+if TYPE_CHECKING:
+    from requests import Response
+
+    from airflow.providers.common.compat.sdk import Connection
+
+
+class InformaticaEDCError(RuntimeError):
+    """Raised when the Informatica Enterprise Data Catalog API returns an 
error."""
+
+
+@dataclass(frozen=True)
+class InformaticaConnectionConfig:
+    """Container for Informatica EDC connection settings."""
+
+    base_url: str
+    username: str | None
+    password: str | None
+    security_domain: str | None
+    verify_ssl: bool
+    request_timeout: int
+    provider_id: str
+    modified_by: str | None
+
+    @property
+    def auth_header(self) -> str | None:
+        """Return the authorization header for the configured credentials."""
+        if not self.username:
+            return None
+
+        domain_prefix = f"{self.security_domain}\\" if self.security_domain 
else ""
+        credential = f"{domain_prefix}{self.username}:{self.password or ''}"
+        token = base64.b64encode(bytes(credential, "utf-8")).decode("utf-8")
+        return f"Basic {token}"
+
+
+class InformaticaEDCHook(HttpHook):
+    """Hook providing a minimal client for the Informatica EDC REST API."""
+
+    conn_name_attr = "informatica_edc_conn_id"
+    default_conn_name = conf.get("informatica", "default_conn_id", 
fallback="informatica_edc_default")
+    conn_type = "informatica_edc"
+    hook_name = "Informatica EDC"
+    _lineage_association = "core.DataSetDataFlow"
+
+    def __init__(
+        self,
+        informatica_edc_conn_id: str = default_conn_name,
+        *,
+        request_timeout: int | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(http_conn_id=informatica_edc_conn_id, method="GET", 
**kwargs)
+        self._config: InformaticaConnectionConfig | None = None
+        self._request_timeout = request_timeout or conf.getint("informatica", 
"request_timeout", fallback=30)
+
+    @property
+    def config(self) -> InformaticaConnectionConfig:
+        """Return cached connection configuration."""
+        if self._config is None:
+            connection = self.get_connection(self.http_conn_id)
+            self._config = self._build_connection_config(connection)
+        return self._config
+
+    def _build_connection_config(self, connection: Connection) -> 
InformaticaConnectionConfig:
+        """Build a configuration object from an Airflow connection."""
+        host = connection.host or ""
+        schema = connection.schema or "https"
+        if host.startswith("http://";) or host.startswith("https://";):
+            base_url = host
+        else:
+            base_url = f"{schema}://{host}" if host else f"{schema}://"
+        if connection.port:
+            base_url = f"{base_url}:{connection.port}"
+
+        extras: MutableMapping[str, Any] = connection.extra_dejson or {}
+        verify_ssl_raw = extras.get("verify_ssl", extras.get("verify", True))
+        verify_ssl = str(verify_ssl_raw).lower() not in {"0", "false", "no"}
+
+        provider_id = str(extras.get("provider_id", "enrichment"))
+        modified_by = str(extras.get("modified_by", connection.login or 
"airflow"))
+        security_domain = extras.get("security_domain") or extras.get("domain")
+
+        return InformaticaConnectionConfig(
+            base_url=base_url.rstrip("/"),
+            username=connection.login,
+            password=connection.password,
+            security_domain=str(security_domain) if security_domain else None,
+            verify_ssl=verify_ssl,
+            request_timeout=self._request_timeout,
+            provider_id=provider_id,
+            modified_by=modified_by,
+        )
+
+    def close_session(self) -> None:
+        pass
+
+    def get_conn(
+        self,
+        headers: dict[str, Any] | None = None,
+        extra_options: dict[str, Any] | None = None,
+    ) -> Any:
+        """Return a configured session augmented with Informatica specific 
headers."""
+        session = super().get_conn(headers=headers, 
extra_options=extra_options)
+        session.verify = self.config.verify_ssl
+        session.headers.update({"Accept": "application/json", "Content-Type": 
"application/json"})
+        if self.config.auth_header:
+            session.headers["Authorization"] = self.config.auth_header
+        return session
+
+    def _build_url(self, endpoint: str) -> str:
+        endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}"
+        return f"{self.config.base_url}{endpoint}"
+
+    def _request(
+        self,
+        method: str,
+        endpoint: str,
+        *,
+        params: Mapping[str, Any] | None = None,
+        json: Mapping[str, Any] | None = None,
+    ) -> Response:
+        """Execute an HTTP request and raise :class:`InformaticaEDCError` on 
failure."""
+        url = self._build_url(endpoint)
+        session = self.get_conn()
+        try:
+            response = session.request(
+                method=method.upper(),
+                url=url,
+                params=params,
+                json=json,
+                timeout=self.config.request_timeout,
+            )
+        except RequestException as exc:
+            raise InformaticaEDCError(f"Failed to call Informatica EDC 
endpoint {endpoint}: {exc}") from exc
+
+        if response.ok:
+            return response
+
+        message = response.text or response.reason
+        raise InformaticaEDCError(
+            f"Informatica EDC request to {endpoint} returned 
{response.status_code}: {message}"
+        )
+
+    def _encode_id(self, id, tilde=False):
+        """
+        Encode an ID to be safe. Return String.

Review Comment:
   `_encode_id(self, id, ...)` uses `id` as a parameter name (shadowing 
Python’s built-in `id()`), and the method is missing type annotations. Renaming 
the parameter (e.g., `object_id: str`) and annotating `tilde: bool` would 
improve readability and static checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to