Copilot commented on code in PR #57610: URL: https://github.com/apache/airflow/pull/57610#discussion_r2761089771
########## providers/informatica/provider.yaml: ########## @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-informatica +name: Informatica Airflow +description: | + `Informatica <https://www.informatica.com//>`__ + +state: ready +source-date-epoch: 1758787152 +# Note that those versions are maintained by release manager - do not update them manually +# with the exception of case where other provider in sources has >= new provider version. +# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider have +# to be done in the same PR +versions: + - 0.1.0 + +integrations: + - integration-name: Informatica + external-doc-url: https://www.informatica.com/ + logo: /docs/integration-logos/informatica.png + tags: [protocol] + +operators: + - integration-name: Informatica + python-modules: + - airflow.providers.informatica.operators.empty + +plugins: + - name: informatica + plugin-class: airflow.providers.informatica.plugins.InformaticaProviderPlugin + Review Comment: `provider.yaml` doesn’t list the hook module nor register a `connection-types` entry for `InformaticaEDCHook`, even though the provider implements a hook with `conn_type = "informatica_edc"`. Other providers include both (e.g. `providers/atlassian/jira/provider.yaml:73-81`). Please add a `hooks:` section and a `connection-types:` entry so the hook/connection type is discoverable in docs/UI and consistent with the rest of the repo. ########## providers/informatica/pyproject.toml: ########## @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE OVERWRITTEN! + +# IF YOU WANT TO MODIFY THIS FILE EXCEPT DEPENDENCIES, YOU SHOULD MODIFY THE TEMPLATE +# `pyproject_TEMPLATE.toml.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY +[build-system] +requires = ["flit_core==3.12.0"] +build-backend = "flit_core.buildapi" + +[project] +name = "apache-airflow-providers-informatica" +version = "0.1.0" +description = "Provider package apache-airflow-providers-informatica for Apache Airflow" +readme = "README.rst" +authors = [ + {name="Apache Software Foundation", email="[email protected]"}, +] +maintainers = [ + {name="Apache Software Foundation", email="[email protected]"}, +] +keywords = [ "airflow-provider", "informatica", "airflow", "integration" ] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Framework :: Apache Airflow", + "Framework :: Apache Airflow :: Provider", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: System :: Monitoring", +] +requires-python = ">=3.10" + +# The dependencies should be modified in place in the generated file. +# Any change in the dependencies is preserved when the file is regenerated +# Make sure to run ``prek update-providers-dependencies --all-files`` +# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` +dependencies = [ + "apache-airflow>=3.0.0", + "apache-airflow-providers-common-compat>=1.12.0", + "apache-airflow-providers-http>=1.0.0", + "attrs>=22.2" +] Review Comment: This provider `pyproject.toml` is missing standard metadata present in other providers (e.g. `providers/http/pyproject.toml:31-32` has `license` and `license-files`). Please add the same fields here for consistency. Also, `attrs>=22.2` appears unused by the provider code in this PR; consider removing it to avoid an unnecessary dependency. ########## providers/informatica/docs/changelog.rst: ########## @@ -0,0 +1,34 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. NOTE TO CONTRIBUTORS: + Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes + and you want to add an explanation to the users on how they are supposed to deal with them. + The changelog is updated and maintained semi-automatically by release manager. + +``apache-airflow-providers-informatica`` + + + +Changelog +========= + +1.0.0 Review Comment: The changelog entry header is `1.0.0`, but the provider package version introduced in this PR is `0.1.0` (see `providers/informatica/src/airflow/providers/informatica/__init__.py` and `providers/informatica/pyproject.toml`). Please align the changelog version with the actual provider version (or the repo’s provider-release conventions). ```suggestion 0.1.0 ``` ########## dev/breeze/tests/test_selective_checks.py: ########## @@ -775,32 +775,32 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): [ { "description": "amazon...google", - "test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]", + "test_types": "Providers[amazon] Providers[apache.livy,atlassian.jira,common.compat,dbt.cloud,dingding,discord,http,informatica,pagerduty] Providers[google]", } ] ), "individual-providers-test-types-list-as-strings-in-json": json.dumps( [ { - "description": "amazon...apache.livy", - "test_types": "Providers[amazon] Providers[apache.livy]", - }, + "description": "amazon...atlassian.jir", + "test_types": "Providers[amazon] Providers[apache.livy] Providers[atlassian.jira]" + }, { - "description": "atlassian.jir...common.compat", - "test_types": "Providers[atlassian.jira] Providers[common.compat]", - }, + "description": "common.compat...dbt.cloud", + "test_types": "Providers[common.compat] Providers[dbt.cloud]" + }, { - "description": "dbt.cloud...dingding", - "test_types": "Providers[dbt.cloud] Providers[dingding]", - }, + "description": "dingding...discord", + "test_types": "Providers[dingding] Providers[discord]" + }, { - "description": "discord...google", - "test_types": "Providers[discord] Providers[google]", - }, + "description": "google...http", + "test_types": "Providers[google] Providers[http]" + }, { - "description": "http...pagerduty", - "test_types": "Providers[http] Providers[pagerduty]", - }, + "description": "informatica...pagerduty", + "test_types": "Providers[informatica] Providers[pagerduty]" + } Review Comment: The expected value for `individual-providers-test-types-list-as-strings-in-json` looks inconsistent with the established pattern in this test file (e.g., descriptions are truncated: `amazon...atlassian.jir`, and the first entry’s `test_types` includes three Providers groups). This will likely not match the actual output of selective checks. Please update the expected JSON to reflect the real provider batching after adding `informatica` (and remove the trailing whitespace after the string literals). ########## providers/informatica/src/airflow/providers/informatica/plugins/informatica.py: ########## @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.configuration import conf +from airflow.plugins_manager import AirflowPlugin + +is_disabled = conf.getboolean("informatica", "disabled", fallback=False) +# Conditional imports - only load expensive dependencies when plugin is enabled +if not is_disabled: + from airflow.lineage.hook import HookLineageReader + from airflow.providers.informatica.plugins.listener import get_informatica_listener + + +class InformaticaProviderPlugin(AirflowPlugin): + """ + Listener that emits numerous Events. + + Informatica Plugin provides listener that emits OL events on DAG start, + complete and failure and TaskInstances start, complete and failure. + """ + + name: str = "InformaticaProviderPlugin" + + listeners: list = [get_informatica_listener()] if not is_disabled else [] + hook_lineage_readers: list = [HookLineageReader] if not is_disabled else [] + Review Comment: Provider plugins in this repo consistently import `AirflowPlugin`/`HookLineageReader` from `airflow.providers.common.compat.sdk` (see `providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py:19-26`). To match that convention and avoid relying on core-module import locations, please switch this plugin to use the compat SDK imports (and remove the trailing whitespace on the blank lines). ########## providers/informatica/tests/unit/informatica/plugins/test_informatica.py: ########## @@ -0,0 +1,62 @@ +"""Unit tests for Informatica provider plugin.""" +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import sys + +import pytest + +from tests_common import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES +from tests_common.test_utils.config import conf_vars + + [email protected]( + RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES, reason="Plugin initialization is done early in case of packages" +) +class TestInformaticaProviderPlugin: + def setup_method(self): + # Remove module under test if loaded already before. This lets us + # import the same source files for more than one test. + if "airflow.providers.informatica.plugins.informatica" in sys.modules: + del sys.modules["airflow.providers.informatica.plugins.informatica"] + + @pytest.mark.parametrize( + ("mocks", "expected"), + [ + # 1: not disabled by default + ([], 1), + # 0: conf disabled = true + ([conf_vars({("informatica", "disabled"): "True"})], 0), + # 0: conf disabled = 1 + ([conf_vars({("informatica", "disabled"): "1"})], 0), + # 1: conf disabled = false + ([conf_vars({("informatica", "disabled"): "False"})], 1), + # 1: conf disabled = 0 + ([conf_vars({("informatica", "disabled"): "0"})], 1), + ], + ) + def test_plugin_disablements(self, mocks, expected): + with contextlib.ExitStack() as stack: + for mock in mocks: + stack.enter_context(mock) + from airflow.providers.informatica.plugins.informatica import InformaticaProviderPlugin Review Comment: This test file uses tabs for indentation (e.g., in the `@pytest.mark.skipif`, `setup_method`, and `test_plugin_disablements` blocks). Airflow’s codebase expects spaces and this will typically fail formatting/lint checks. Please convert indentation to 4 spaces throughout the file. ########## providers/informatica/README.rst: ########## @@ -0,0 +1,183 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Apache Airflow Informatica Provider +=================================== + +This provider package contains integrations for `Informatica Enterprise Data Catalog (EDC) <https://www.informatica.com/products/data-governance/enterprise-data-catalog.html>`_ to work with Apache Airflow. + + +Features +-------- + +- **Airflow Integration**: Seamless integration with Airflow's lineage system using inlets and outlets. + + +Installation +------------ + +.. code-block:: bash + + pip install apache-airflow-providers-informatica + + + +Connection Setup +~~~~~~~~~~~~~~~~ + +Create an Informatica EDC connection in Airflow: + + #. **Connection Type**: ``http`` + #. **Host**: Your EDC server hostname + #. **Port**: EDC server port (typically 9087) + #. **Schema**: ``https`` or ``http`` + #. **Login**: EDC username + #. **Password**: EDC password + #. **Extras**: Add the following JSON: + + .. code-block:: json + + {"security_domain": "your_security_domain"} + +Configuration Options +~~~~~~~~~~~~~~~~~~~~~ + +Add to your ``airflow.cfg``: + +.. code-block:: ini + + [informatica] + # Disable sending events without uninstalling the Informatica Provider + disabled = False + # The connection ID to use when no connection ID is provided + default_conn_id = informatica_edc_default + + + +Complete DAG Example +~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from airflow import DAG + from airflow.operators.python import PythonOperator + from datetime import datetime + + + def my_python_task(**kwargs): + print("Hello Informatica Lineage!") + + + with DAG( + dag_id="example_informatica_lineage_dag", + start_date=datetime(2024, 1, 1), + schedule_interval=None, + catchup=False, + ) as dag: + python_task = PythonOperator( + task_id="my_python_task", + python_callable=my_python_task, + inlets=[{"dataset_uri": "edc://object/source_table_abc123"}], + outlets=[{"dataset_uri": "edc://object/target_table_xyz789"}], + ) + python_task Review Comment: This example uses `schedule_interval` and imports `PythonOperator` from `airflow.operators.python`, which is inconsistent with the provider’s Airflow 3+ requirement and other examples in this PR. Please update the example to Airflow 3 style (`schedule=...`) and import `PythonOperator` from the Standard provider (`airflow.providers.standard.operators.python`). ########## providers/informatica/tests/unit/informatica/operators/test_empty.py: ########## @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import Mock + +import pytest Review Comment: Import of 'pytest' is not used. ########## providers/informatica/docs/conf.py: ########## @@ -0,0 +1,27 @@ +# Disable Flake8 because of all the sphinx imports +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Configuration of Providers docs building.""" + +from __future__ import annotations + +import os + +os.environ["AIRFLOW_PACKAGE_NAME"] = "apache-airflow-providers-informatica" + +from docs.provider_conf import * # noqa: F403 Review Comment: Import pollutes the enclosing namespace, as the imported module [docs.provider_conf](1) does not define '__all__'. ```suggestion from docs import provider_conf as _provider_conf for _name in dir(_provider_conf): if not _name.startswith("_"): globals()[_name] = getattr(_provider_conf, _name) ``` ########## providers/informatica/src/airflow/providers/informatica/hooks/edc.py: ########## @@ -0,0 +1,249 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import base64 +import re +from collections.abc import Mapping, MutableMapping +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from requests.exceptions import RequestException + +from airflow.configuration import conf +from airflow.providers.http.hooks.http import HttpHook + +if TYPE_CHECKING: + from requests import Response + + from airflow.providers.common.compat.sdk import Connection + + +class InformaticaEDCError(RuntimeError): + """Raised when the Informatica Enterprise Data Catalog API returns an error.""" + + +@dataclass(frozen=True) +class InformaticaConnectionConfig: + """Container for Informatica EDC connection settings.""" + + base_url: str + username: str | None + password: str | None + security_domain: str | None + verify_ssl: bool + request_timeout: int + provider_id: str + modified_by: str | None + + @property + def auth_header(self) -> str | None: + """Return the authorization header for the configured credentials.""" + if not self.username: + return None + + domain_prefix = f"{self.security_domain}\\" if self.security_domain else "" + credential = f"{domain_prefix}{self.username}:{self.password or ''}" + token = base64.b64encode(bytes(credential, "utf-8")).decode("utf-8") + return f"Basic {token}" + + +class InformaticaEDCHook(HttpHook): + """Hook providing a minimal client for the Informatica EDC REST API.""" + + conn_name_attr = "informatica_edc_conn_id" + default_conn_name = conf.get("informatica", "default_conn_id", fallback="informatica_edc_default") + conn_type = "informatica_edc" + hook_name = "Informatica EDC" + _lineage_association = "core.DataSetDataFlow" + + def __init__( + self, + informatica_edc_conn_id: str = default_conn_name, + *, + request_timeout: int | None = None, + **kwargs, + ) -> None: + super().__init__(http_conn_id=informatica_edc_conn_id, method="GET", **kwargs) + self._config: InformaticaConnectionConfig | None = None + self._request_timeout = request_timeout or conf.getint("informatica", "request_timeout", fallback=30) + + @property + def config(self) -> InformaticaConnectionConfig: + """Return cached connection configuration.""" + if self._config is None: + connection = self.get_connection(self.http_conn_id) + self._config = self._build_connection_config(connection) + return self._config + + def _build_connection_config(self, connection: Connection) -> InformaticaConnectionConfig: + """Build a configuration object from an Airflow connection.""" + host = connection.host or "" + schema = connection.schema or "https" + if host.startswith("http://") or host.startswith("https://"): + base_url = host + else: + base_url = f"{schema}://{host}" if host else f"{schema}://" + if connection.port: + base_url = f"{base_url}:{connection.port}" + + extras: MutableMapping[str, Any] = connection.extra_dejson or {} + verify_ssl_raw = extras.get("verify_ssl", extras.get("verify", True)) + verify_ssl = str(verify_ssl_raw).lower() not in {"0", "false", "no"} + + provider_id = str(extras.get("provider_id", "enrichment")) + modified_by = str(extras.get("modified_by", connection.login or "airflow")) + security_domain = extras.get("security_domain") or extras.get("domain") + + return InformaticaConnectionConfig( + base_url=base_url.rstrip("/"), + username=connection.login, + password=connection.password, + security_domain=str(security_domain) if security_domain else None, + verify_ssl=verify_ssl, + request_timeout=self._request_timeout, + provider_id=provider_id, + modified_by=modified_by, + ) + + def close_session(self) -> None: + pass + + def get_conn( + self, + headers: dict[str, Any] | None = None, + extra_options: dict[str, Any] | None = None, + ) -> Any: + """Return a configured session augmented with Informatica specific headers.""" + session = super().get_conn(headers=headers, extra_options=extra_options) + session.verify = self.config.verify_ssl + session.headers.update({"Accept": "application/json", "Content-Type": "application/json"}) + if self.config.auth_header: + session.headers["Authorization"] = self.config.auth_header + return session + + def _build_url(self, endpoint: str) -> str: + endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}" + return f"{self.config.base_url}{endpoint}" + + def _request( + self, + method: str, + endpoint: str, + *, + params: Mapping[str, Any] | None = None, + json: Mapping[str, Any] | None = None, + ) -> Response: + """Execute an HTTP request and raise :class:`InformaticaEDCError` on failure.""" + url = self._build_url(endpoint) + session = self.get_conn() + try: + response = session.request( + method=method.upper(), + url=url, + params=params, + json=json, + timeout=self.config.request_timeout, + ) + except RequestException as exc: + raise InformaticaEDCError(f"Failed to call Informatica EDC endpoint {endpoint}: {exc}") from exc + + if response.ok: + return response + + message = response.text or response.reason + raise InformaticaEDCError( + f"Informatica EDC request to {endpoint} returned {response.status_code}: {message}" + ) + + def _encode_id(self, id, tilde=False): + """ + Encode an ID to be safe. Return String. + + Parameters + ---------- + id : String + ID of object + tilde : Boolean, optional (default=True) + Whether to encode with a tilde or percent sign. + """ + # Replace three underscores with two backslashes + if ":___" in id: + id = id.replace(":___", "://") + + # Get REGEX set-up + regex = re.compile("([^a-zA-Z0-9-_])") + + # Initialize a few variables + id_lst = list(id) + idx = 0 + + # Replace each unsafe char with "~Hex(Byte(unsafe char))~" + while regex.search(id, idx) is not None: + idx = regex.search(id, idx).span()[1] + if tilde: + id_lst[idx - 1] = "~" + str(bytes(id_lst[idx - 1], "utf-8").hex()) + "~" + else: + id_lst[idx - 1] = "%" + str(bytes(id_lst[idx - 1], "utf-8").hex()) + + return "".join(id_lst) Review Comment: `_encode_id` has a couple of concrete issues: (1) the regex character class `[^a-zA-Z0-9-_]` treats `9-_` as a range, so many characters (e.g. `:`, `@`, `[`) will be considered “safe” and won’t be encoded; use `[^a-zA-Z0-9_-]` or escape the hyphen. (2) The docstring says `tilde` defaults to True, but the signature defaults to `False`. Please fix the regex and align the docstring/signature. ########## airflow-core/docs/extra-packages-ref.rst: ########## @@ -419,6 +419,8 @@ pre-installed when Airflow is installed. +---------------------+-----------------------------------------------------+--------------------------------------+--------------+ | ssh | ``pip install 'apache-airflow[ssh]'`` | SSH hooks and operators | | +---------------------+-----------------------------------------------------+--------------------------------------+--------------+ +| informatica | ``pip install 'apache-airflow[informatica]'`` | Informatica hooks and operators | | ++---------------------+-----------------------------------------------------+-----------------------------------------------------+ Review Comment: This ASCII table row is malformed: the separator line has a different number of columns than the rest of the table, which will break the docs build. Please keep the 4-column border/separator format consistent with surrounding rows when adding the `informatica` extra (including the trailing `|` column). ```suggestion +---------------------+-----------------------------------------------------+--------------------------------------+--------------+ ``` ########## providers/informatica/docs/guides/usage.rst: ########## @@ -0,0 +1,185 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Usage Guide +=========== + +The Informatica provider enables automatic lineage tracking for Airflow tasks that define inlets and outlets. + +How It Works +------------ + +The Informatica plugin automatically detects tasks with lineage support and sends inlet/outlet information to Informatica EDC when tasks succeed. No additional configuration is required beyond defining inlets and outlets in your tasks. + +Key Features +------------ + +- **Automatic Lineage Detection**: Plugin automatically detects tasks with lineage support +- **EDC Integration**: Native REST API integration with Informatica Enterprise Data Catalog +- **Transparent Operation**: No code changes required beyond inlet/outlet definitions +- **Error Handling**: Robust error handling for API failures and invalid objects +- **Configurable**: Extensive configuration options for different environments + +Architecture +------------ + +The provider consists of several key components: + +**Hooks** + ``InformaticaEDCHook`` provides low-level EDC API access for authentication, object retrieval, and lineage creation. + +**Extractors** + ``InformaticaLineageExtractor`` handles lineage data extraction and conversion to Airflow-compatible formats. + +**Plugins** + ``InformaticaProviderPlugin`` registers listeners that monitor task lifecycle events and trigger lineage operations. + +**Operators** + ``EmptyOperator``: A minimal operator included for basic lineage testing and integration. It supports inlets and outlets, allowing you to verify lineage extraction and EDC integration without custom logic. + +**Listeners** + Event-driven listeners that respond to task success/failure events and process lineage information. + + +Requirements +------------ + +- Apache Airflow 3.0+ +- Access to Informatica Enterprise Data Catalog instance +- Valid EDC credentials with API access permissions + + +Quick Start +----------- + +1. **Install the provider:** + + .. code-block:: bash + + pip install apache-airflow-providers-informatica + +2. **Configure connection:** + + Create an HTTP connection in Airflow UI with EDC server details and security domain in extras. + +3. **Add lineage to tasks:** + + Define inlets and outlets in your tasks using EDC object URIs. + +4. **Run your DAG:** + + The provider automatically handles lineage extraction when tasks succeed. + + +Example DAG +----------- + +.. code-block:: python + + from airflow import DAG + from airflow.providers.standard.operators.python import PythonOperator + from datetime import datetime + + + def my_python_task(**kwargs): + print("Hello Informatica Lineage!") + + + with DAG( + dag_id="example_informatica_lineage_dag", + start_date=datetime(2024, 1, 1), + schedule_interval=None, Review Comment: The example DAG uses `schedule_interval`, which is deprecated in modern Airflow and inconsistent with the provider’s Airflow 3+ requirement. Please update to the Airflow 3-style `schedule=` argument in this documentation example. ```suggestion schedule=None, ``` ########## providers/informatica/tests/unit/informatica/hooks/test_edc.py: ########## @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.informatica.hooks.edc import InformaticaEDCHook + + [email protected] +def hook(): + return InformaticaEDCHook(informatica_edc_conn_id="test_conn") + + +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_connection") +def test_config_property_and_build_connection_config(mock_get_connection, hook): + """Test config property and _build_connection_config method.""" + mock_conn = MagicMock() + mock_conn.host = "testhost" + mock_conn.schema = "https" + mock_conn.port = 443 + mock_conn.login = "user" + mock_conn.password = "pass" + mock_conn.extra_dejson = { + "verify_ssl": True, + "provider_id": "test_provider", + "modified_by": "tester", + "security_domain": "domain", + } + mock_get_connection.return_value = mock_conn + config = hook.config + assert config.base_url == "https://testhost:443" + assert config.username == "user" + assert config.password == "pass" + assert config.security_domain == "domain" + assert config.provider_id == "test_provider" + assert config.modified_by == "tester" + assert config.verify_ssl is True + assert isinstance(config.request_timeout, int) + assert config.auth_header.startswith("Basic ") + + +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_connection") +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_conn") +def test_get_conn_headers_and_verify(mock_get_conn, mock_get_connection, hook): + """Test get_conn sets headers and verify.""" + mock_conn = MagicMock() + mock_conn.host = "testhost" + mock_conn.schema = "https" + mock_conn.port = 443 + mock_conn.login = "user" + mock_conn.password = "pass" + mock_conn.extra_dejson = {"verify_ssl": True} + mock_get_connection.return_value = mock_conn + mock_session = MagicMock() + mock_session.headers = {} + mock_get_conn.return_value = mock_session + session = hook.get_conn() + assert "Accept" in session.headers + assert "Content-Type" in session.headers + assert "Authorization" in session.headers + assert session.verify is True + + +def test_build_url(hook): + """Test _build_url method.""" + hook._config = MagicMock(base_url="http://test") + url = hook._build_url("endpoint") + assert url == "http://test/endpoint" + url2 = hook._build_url("/endpoint") + assert url2 == "http://test/endpoint" + + +@patch("airflow.providers.informatica.hooks.edc.InformaticaEDCHook.get_conn") +def test_request_success_and_error(mock_get_conn, hook): + """Test _request method for success and error cases.""" + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.ok = True + mock_response.status_code = 200 + mock_response.text = "" + mock_response.json.return_value = {"result": "ok"} + mock_session.request.return_value = mock_response + mock_get_conn.return_value = mock_session + hook._config = MagicMock(base_url="http://test", request_timeout=10) + resp = hook._request("GET", "endpoint") + assert resp.json() == {"result": "ok"} + + # Error case + mock_response.ok = False + mock_response.status_code = 400 + mock_response.text = "Bad Request" + mock_session.request.return_value = mock_response + try: + hook._request("GET", "endpoint") + pytest.fail("Expected exception was not raised") + except Exception: Review Comment: 'except' clause does nothing but pass and there is no explanatory comment. ########## providers/informatica/tests/unit/informatica/hooks/test_edc.py: ########## @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.informatica.hooks.edc import InformaticaEDCHook + + [email protected] +def hook(): + return InformaticaEDCHook(informatica_edc_conn_id="test_conn") + + +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_connection") +def test_config_property_and_build_connection_config(mock_get_connection, hook): + """Test config property and _build_connection_config method.""" + mock_conn = MagicMock() + mock_conn.host = "testhost" + mock_conn.schema = "https" + mock_conn.port = 443 + mock_conn.login = "user" + mock_conn.password = "pass" + mock_conn.extra_dejson = { + "verify_ssl": True, + "provider_id": "test_provider", + "modified_by": "tester", + "security_domain": "domain", + } + mock_get_connection.return_value = mock_conn + config = hook.config + assert config.base_url == "https://testhost:443" + assert config.username == "user" + assert config.password == "pass" + assert config.security_domain == "domain" + assert config.provider_id == "test_provider" + assert config.modified_by == "tester" + assert config.verify_ssl is True + assert isinstance(config.request_timeout, int) + assert config.auth_header.startswith("Basic ") + + +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_connection") +@patch("airflow.providers.informatica.hooks.edc.HttpHook.get_conn") +def test_get_conn_headers_and_verify(mock_get_conn, mock_get_connection, hook): + """Test get_conn sets headers and verify.""" + mock_conn = MagicMock() + mock_conn.host = "testhost" + mock_conn.schema = "https" + mock_conn.port = 443 + mock_conn.login = "user" + mock_conn.password = "pass" + mock_conn.extra_dejson = {"verify_ssl": True} + mock_get_connection.return_value = mock_conn + mock_session = MagicMock() + mock_session.headers = {} + mock_get_conn.return_value = mock_session + session = hook.get_conn() + assert "Accept" in session.headers + assert "Content-Type" in session.headers + assert "Authorization" in session.headers + assert session.verify is True + + +def test_build_url(hook): + """Test _build_url method.""" + hook._config = MagicMock(base_url="http://test") + url = hook._build_url("endpoint") + assert url == "http://test/endpoint" + url2 = hook._build_url("/endpoint") + assert url2 == "http://test/endpoint" + + +@patch("airflow.providers.informatica.hooks.edc.InformaticaEDCHook.get_conn") +def test_request_success_and_error(mock_get_conn, hook): + """Test _request method for success and error cases.""" + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.ok = True + mock_response.status_code = 200 + mock_response.text = "" + mock_response.json.return_value = {"result": "ok"} + mock_session.request.return_value = mock_response + mock_get_conn.return_value = mock_session + hook._config = MagicMock(base_url="http://test", request_timeout=10) + resp = hook._request("GET", "endpoint") + assert resp.json() == {"result": "ok"} + + # Error case + mock_response.ok = False + mock_response.status_code = 400 + mock_response.text = "Bad Request" + mock_session.request.return_value = mock_response + try: + hook._request("GET", "endpoint") + pytest.fail("Expected exception was not raised") + except Exception: + pass + + +def test_encode_id(hook): + """Test _encode_id method for tilde and percent encoding.""" + # ID with unsafe chars + unsafe_id = "table:___name/unsafe" + encoded = hook._encode_id(unsafe_id, tilde=True) + assert "~" in encoded + encoded_percent = hook._encode_id(unsafe_id, tilde=False) + assert "%" in encoded_percent + + +@patch("airflow.providers.informatica.hooks.edc.InformaticaEDCHook._request") +def test_get_object(mock_request, hook): + """Test get_object method.""" + mock_request.return_value.json.return_value = {"id": "table://database/schema/safe", "name": "test"} + hook._config = MagicMock(base_url="http://test", request_timeout=10) + obj = hook.get_object("table://database/schema/safe") + assert obj["id"] == "table://database/schema/safe" + assert obj["name"] == "test" + + +@patch("airflow.providers.informatica.hooks.edc.InformaticaEDCHook._request") +def test_create_lineage_link(mock_request, hook): + """Test create_lineage_link method and error for same source/target.""" + hook._config = MagicMock( + base_url="http://test", provider_id="prov", modified_by="mod", request_timeout=10 + ) + mock_request.return_value.content = b'{"success": true}' + mock_request.return_value.json.return_value = {"success": True} + result = hook.create_lineage_link("src_id", "tgt_id") + assert result["success"] is True + # Error for same source/target + try: + hook.create_lineage_link("same_id", "same_id") + pytest.fail("Expected exception was not raised") + except Exception: Review Comment: 'except' clause does nothing but pass and there is no explanatory comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
