ashb commented on code in PR #29940:
URL: https://github.com/apache/airflow/pull/29940#discussion_r1152067626


##########
airflow/providers/openlineage/extractors/__init__.py:
##########
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from airflow.providers.openlineage.extractors.base import BaseExtractor, 
OperatorLineage
+from airflow.providers.openlineage.extractors.manager import ExtractorManager
+
+__all__ = ["BaseExtractor", "OperatorLineage", "ExtractorManager"]  # type: 
ignore

Review Comment:
   ```suggestion
   __all__ = ["BaseExtractor", "OperatorLineage", "ExtractorManager"]
   ```
   Shouldn't be needed anymore



##########
airflow/providers/openlineage/extractors/base.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+from attrs import Factory, define
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from openlineage.client.facet import BaseFacet
+from openlineage.client.run import Dataset
+
+
+@define
+class OperatorLineage:
+    """Structure returned from lineage extraction."""
+
+    inputs: list[Dataset] = Factory(list)
+    outputs: list[Dataset] = Factory(list)
+    run_facets: dict[str, BaseFacet] = Factory(dict)
+    job_facets: dict[str, BaseFacet] = Factory(dict)
+
+
+class BaseExtractor(ABC, LoggingMixin):
+    """
+    Abstract base extractor class.
+
+    This is used mostly to maintain support for custom extractors.
+    """
+
+    _allowed_query_params: list[str] = []
+
+    def __init__(self, operator):
+        super().__init__()
+        self.operator = operator
+        self.patch()
+
+    def patch(self):
+        # Extractor should register extension methods or patches to operator 
here
+        pass
+
+    @classmethod
+    @abstractmethod
+    def get_operator_classnames(cls) -> list[str]:
+        """
+        Implement this method returning list of operators that extractor works 
for.

Review Comment:
   ```suggestion
           Implement this method returning list of operators that extractor 
works for.
           
   ```
   
   Good practice is to have a blank line should be between the synopsis/short 
desc/first sentence and the rest. It only matters if these docs get rendered 
via Sphinx, but when they do, it shows a nice short description in the list.



##########
airflow/providers/openlineage/extractors/manager.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+
+from airflow.providers.openlineage.extractors import BaseExtractor, 
OperatorLineage
+from airflow.providers.openlineage.extractors.base import DefaultExtractor
+from airflow.providers.openlineage.plugins.facets import (
+    UnknownOperatorAttributeRunFacet,
+    UnknownOperatorInstance,
+)
+from airflow.providers.openlineage.utils.utils import get_operator_class
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class ExtractorManager(LoggingMixin):
+    """Class abstracting management of custom extractors."""
+
+    def __init__(self):
+        super().__init__()
+        self.extractors: dict[str, type[BaseExtractor]] = {}
+        self.default_extractor = DefaultExtractor
+
+        # Comma-separated extractors in OPENLINEAGE_EXTRACTORS variable.
+        # Extractors should implement BaseExtractor
+        from airflow.providers.openlineage.utils import import_from_string
+
+        # TODO: use airflow config with OL backup
+        env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")
+        if env_extractors is not None:
+            for extractor in env_extractors.split(";"):
+                extractor: type[BaseExtractor] = 
import_from_string(extractor.strip())
+                for operator_class in extractor.get_operator_classnames():
+                    self.extractors[operator_class] = extractor
+
+    def add_extractor(self, operator: str, extractor: type[BaseExtractor]):
+        self.extractors[operator] = extractor
+
+    def extract_metadata(self, dagrun, task, complete: bool = False, 
task_instance=None) -> OperatorLineage:
+        extractor = self._get_extractor(task)
+        task_info = (
+            f"task_type={task.task_type} "
+            f"airflow_dag_id={task.dag_id} "
+            f"task_id={task.task_id} "
+            f"airflow_run_id={dagrun.run_id} "
+        )
+
+        if extractor:
+            # Extracting advanced metadata is only possible when extractor for 
particular operator
+            # is defined. Without it, we can't extract any input or output 
data.
+            try:
+                self.log.debug("Using extractor %s %s", 
extractor.__class__.__name__, str(task_info))
+                if complete:
+                    task_metadata = 
extractor.extract_on_complete(task_instance)
+                else:
+                    task_metadata = extractor.extract()
+
+                self.log.debug("Found task metadata for operation %s: %s", 
task.task_id, str(task_metadata))
+                if task_metadata:
+                    if (not task_metadata.inputs) and (not 
task_metadata.outputs):
+                        inlets = task.get_inlet_defs()
+                        outlets = task.get_outlet_defs()
+                        self.extract_inlets_and_outlets(task_metadata, inlets, 
outlets)
+
+                    return task_metadata
+
+            except Exception as e:
+                self.log.exception(
+                    "Failed to extract metadata using found extractor %s - %s 
%s", extractor, e, task_info
+                )
+        else:
+            self.log.debug("Unable to find an extractor %s", task_info)
+
+            # Only include the unkonwnSourceAttribute facet if there is no 
extractor
+            task_metadata = OperatorLineage(
+                run_facets={
+                    "unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
+                        unknownItems=[
+                            UnknownOperatorInstance(
+                                name=get_operator_class(task).__name__,
+                                properties={attr: value for attr, value in 
task.__dict__.items()},
+                            )
+                        ]
+                    )
+                },
+            )
+            inlets = task.get_inlet_defs()
+            outlets = task.get_outlet_defs()

Review Comment:
   These are deprecated now
   ```suggestion
               inlets = task.inlets
               outlets = task.outlets
   ```
   
   (Added/changed in #25767, which was 2.4. So maybe this is fine as it its?



##########
airflow/providers/openlineage/extractors/manager.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+
+from airflow.providers.openlineage.extractors import BaseExtractor, 
OperatorLineage
+from airflow.providers.openlineage.extractors.base import DefaultExtractor
+from airflow.providers.openlineage.plugins.facets import (
+    UnknownOperatorAttributeRunFacet,
+    UnknownOperatorInstance,
+)
+from airflow.providers.openlineage.utils.utils import get_operator_class
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class ExtractorManager(LoggingMixin):
+    """Class abstracting management of custom extractors."""
+
+    def __init__(self):
+        super().__init__()
+        self.extractors: dict[str, type[BaseExtractor]] = {}
+        self.default_extractor = DefaultExtractor
+
+        # Comma-separated extractors in OPENLINEAGE_EXTRACTORS variable.
+        # Extractors should implement BaseExtractor
+        from airflow.providers.openlineage.utils import import_from_string
+
+        # TODO: use airflow config with OL backup
+        env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")

Review Comment:
   Just making a note so we don't forget to do this



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__

Review Comment:
   Duck-typing  is more the python way, and is likely more future-proof
   ```suggestion
       return getattr(task, "operator_class", task.__class__)
   ```



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,

Review Comment:
   What about timetables? What about Dataset-triggered dags?



##########
airflow/providers/openlineage/plugins/facets.py:
##########
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from attrs import define
+
+from airflow.providers.openlineage import version as 
OPENLINEAGE_AIRFLOW_VERSION
+from airflow.version import version as AIRFLOW_VERSION
+from openlineage.client.facet import BaseFacet
+from openlineage.client.utils import RedactMixin
+
+
+@define(slots=False)
+class AirflowVersionRunFacet(BaseFacet):
+    """Run facet containing task and DAG info"""
+
+    operator: str
+    taskInfo: dict[str, object]
+    airflowVersion: str
+    openlineageAirflowVersion: str
+
+    _additional_skip_redact: list[str] = [
+        "operator",
+        "airflowVersion",
+        "openlineageAirflowVersion",
+    ]
+
+    @classmethod
+    def from_dagrun_and_task(cls, dagrun, task):
+        # task.__dict__ may contain values uncastable to str
+        from airflow.providers.openlineage.utils import get_operator_class, 
to_json_encodable
+
+        task_info = to_json_encodable(task)

Review Comment:
   This serializes every property of the task -- that seems very a bit 
overkill? I guess it's not too bad though as it only goes one level down, not 
recursively to inside things like `self.conn` if a task has it.



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG

Review Comment:
   Not a constant (which is what the case implies) so either leave the import 
as `DAG` or make it `AirflowDag` please



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):
+        return BaseHook.get_connection(conn_id=conn_id)
+    return None
+
+
+def get_job_name(task):
+    return f"{task.dag_id}.{task.task_id}"
+
+
+def get_custom_facets(
+    dagrun, task, is_external_trigger: bool, task_instance: TaskInstance | 
None = None
+) -> dict[str, Any]:
+    custom_facets = {
+        "airflow_runArgs": AirflowRunArgsRunFacet(is_external_trigger),
+        "airflow_version": AirflowVersionRunFacet.from_dagrun_and_task(dagrun, 
task),
+    }
+    # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
+    # this comes from Airflow code
+    if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
+        custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+    return custom_facets
+
+
+class InfoJsonEncodable(dict):
+    """
+    Airflow objects might not be json-encodable overall.
+
+    The class provides additional attributes to control
+    what and how is encoded:
+    * renames: a dictionary of attribute name changes
+    * casts: a dictionary consisting of attribute names
+             and corresponding methods that should change
+             object value
+    * includes: list of attributes to be included in encoding
+    * excludes: list of attributes to be excluded from encoding
+
+    Don't use both includes and excludes.
+    """
+
+    renames: dict[str, str] = {}
+    casts: dict[str, Any] = {}
+    includes: list[str] = []
+    excludes: list[str] = []
+
+    def __init__(self, obj):
+        self.obj = obj
+        self._fields = []
+
+        self._cast_fields()
+        self._rename_fields()
+        self._include_fields()
+        dict.__init__(
+            self,
+            **{field: InfoJsonEncodable._cast_basic_types(getattr(self, 
field)) for field in self._fields},
+        )
+
+    @staticmethod
+    def _cast_basic_types(value):
+        if isinstance(value, datetime.datetime):
+            return value.isoformat()
+        if isinstance(value, (set, list, tuple)):
+            return str(list(value))
+        return value
+
+    def _rename_fields(self):
+        for field, renamed in self.renames.items():
+            if hasattr(self.obj, field):
+                setattr(self, renamed, getattr(self.obj, field))
+                self._fields.append(renamed)
+
+    def _cast_fields(self):
+        for field, func in self.casts.items():
+            setattr(self, field, func(self.obj))
+            self._fields.append(field)
+
+    def _include_fields(self):
+        if self.includes and self.excludes:
+            raise Exception("Don't use both includes and excludes.")
+        if self.includes:
+            for field in self.includes:
+                if field in self._fields or not hasattr(self.obj, field):
+                    continue
+                setattr(self, field, getattr(self.obj, field))
+                self._fields.append(field)
+        else:
+            for field, val in self.obj.__dict__.items():
+                if field in self._fields or field in self.excludes or field in 
self.renames:
+                    continue
+                setattr(self, field, val)
+                self._fields.append(field)
+
+
+class DagInfo(InfoJsonEncodable):
+    """Defines encoding DAG object to JSON."""
+
+    includes = ["dag_id", "schedule_interval", "tags", "start_date"]
+    casts = {"timetable": lambda dag: dag.timetable.serialize() if 
getattr(dag, "timetable", None) else None}
+    renames = {"_dag_id": "dag_id"}
+
+
+class DagRunInfo(InfoJsonEncodable):
+    """Defines encoding DagRun object to JSON."""
+
+    includes = [
+        "conf",
+        "dag_id",
+        "data_interval_start",
+        "data_interval_end",
+        "external_trigger",
+        "run_id",
+        "run_type",
+        "start_date",
+    ]
+
+
+class TaskInstanceInfo(InfoJsonEncodable):
+    """Defines encoding TaskInstance object to JSON."""
+
+    includes = ["duration", "try_number", "pool"]
+    casts = {
+        "map_index": lambda ti: ti.map_index
+        if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
+        else None
+    }
+
+
+class TaskInfo(InfoJsonEncodable):
+    """Defines encoding BaseOperator/AbstractOperator object to JSON."""
+
+    renames = {
+        "_BaseOperator__init_kwargs": "args",
+        "_BaseOperator__from_mapped": "mapped",
+        "_downstream_task_ids": "downstream_task_ids",
+        "_upstream_task_ids": "upstream_task_ids",
+    }
+    excludes = [
+        "_BaseOperator__instantiated",
+        "_dag",
+        "_hook",
+        "_log",
+        "_outlets",
+        "_inlets",
+        "_lock_for_execution",
+        "handler",
+        "params",
+        "python_callable",
+        "retry_delay",
+    ]
+    casts = {
+        "operator_class": lambda task: 
f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",  
# noqa
+        "task_group": lambda task: TaskGroupInfo(task.task_group)
+        if hasattr(task, "task_group") and getattr(task.task_group, 
"_group_id", None)
+        else None,
+    }
+
+
+class TaskGroupInfo(InfoJsonEncodable):
+    """Defines encoding TaskGroup object to JSON."""
+
+    renames = {
+        "_group_id": "group_id",
+    }
+    includes = [
+        "downstream_group_ids",
+        "downstream_task_ids",
+        "prefix_group_id",
+        "tooltip",
+        "upstream_group_ids",
+        "upstream_task_ids",
+    ]
+
+
+def get_airflow_run_facet(
+    dag_run: DagRun,
+    dag: DAG,
+    task_instance: TaskInstance,
+    task: BaseOperator,
+    task_uuid: str,
+):
+    return {
+        "airflow": json.loads(
+            json.dumps(
+                asdict(
+                    AirflowRunFacet(
+                        dag=DagInfo(dag),
+                        dagRun=DagRunInfo(dag_run),
+                        taskInstance=TaskInstanceInfo(task_instance),
+                        task=TaskInfo(task),
+                        taskUuid=task_uuid,
+                    )
+                ),
+                default=str,
+            )
+        )
+    }
+
+
+def import_from_string(path: str):
+    try:
+        module_path, target = path.rsplit(".", 1)
+        module = importlib.import_module(module_path)
+        return getattr(module, target)
+    except Exception as e:
+        log.warning(e)
+        raise ImportError(f"Failed to import {path}") from e

Review Comment:
   The same as `from airflow.utils.module_loading import import_string`? Can we 
use that instead?



##########
airflow/providers/openlineage/extractors/base.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+from attrs import Factory, define
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from openlineage.client.facet import BaseFacet
+from openlineage.client.run import Dataset
+
+
+@define
+class OperatorLineage:
+    """Structure returned from lineage extraction."""
+
+    inputs: list[Dataset] = Factory(list)
+    outputs: list[Dataset] = Factory(list)
+    run_facets: dict[str, BaseFacet] = Factory(dict)
+    job_facets: dict[str, BaseFacet] = Factory(dict)
+
+
+class BaseExtractor(ABC, LoggingMixin):
+    """
+    Abstract base extractor class.
+
+    This is used mostly to maintain support for custom extractors.
+    """
+
+    _allowed_query_params: list[str] = []
+
+    def __init__(self, operator):

Review Comment:
   We should probably start adding typing hints to this. In this case I guess 
its?
   
   ```suggestion
       def __init__(self, operator: BaseOperator):
   ```



##########
tests/providers/openlineage/conftest.py:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Copyright 2018-2023 contributors to the OpenLineage project
+# SPDX-License-Identifier: Apache-2.0
+from __future__ import annotations
+
+import logging
+import os
+from unittest.mock import patch
+
+import pytest
+
+log = logging.getLogger(__name__)
+
+
[email protected](scope="function")
+def remove_redshift_conn():
+    if "REDSHIFT_CONN" in os.environ:
+        del os.environ["REDSHIFT_CONN"]
+    if "WRITE_SCHEMA" in os.environ:
+        del os.environ["WRITE_SCHEMA"]
+
+
[email protected](scope="function")
+def we_module_env():
+    os.environ["REDSHIFT_CONN"] = "postgresql://user:[email protected]:1234/db"
+    os.environ["WRITE_SCHEMA"] = "testing"
+
+
[email protected](scope="function")
+def dagbag():
+    log.debug("dagbag()")
+    os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = "sqlite://"

Review Comment:
   Airflow tests generally shouldn't set this -- Airflow will (I think) already 
be loaded, so this might not actually have any effect, but regardless, the 
airflow test suite sets up the DB.



##########
airflow/providers/openlineage/extractors/manager.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+
+from airflow.providers.openlineage.extractors import BaseExtractor, 
OperatorLineage
+from airflow.providers.openlineage.extractors.base import DefaultExtractor
+from airflow.providers.openlineage.plugins.facets import (
+    UnknownOperatorAttributeRunFacet,
+    UnknownOperatorInstance,
+)
+from airflow.providers.openlineage.utils.utils import get_operator_class
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class ExtractorManager(LoggingMixin):
+    """Class abstracting management of custom extractors."""
+
+    def __init__(self):
+        super().__init__()
+        self.extractors: dict[str, type[BaseExtractor]] = {}
+        self.default_extractor = DefaultExtractor
+
+        # Comma-separated extractors in OPENLINEAGE_EXTRACTORS variable.
+        # Extractors should implement BaseExtractor
+        from airflow.providers.openlineage.utils import import_from_string
+
+        # TODO: use airflow config with OL backup
+        env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")
+        if env_extractors is not None:
+            for extractor in env_extractors.split(";"):
+                extractor: type[BaseExtractor] = 
import_from_string(extractor.strip())
+                for operator_class in extractor.get_operator_classnames():
+                    self.extractors[operator_class] = extractor
+
+    def add_extractor(self, operator: str, extractor: type[BaseExtractor]):
+        self.extractors[operator] = extractor
+
+    def extract_metadata(self, dagrun, task, complete: bool = False, 
task_instance=None) -> OperatorLineage:
+        extractor = self._get_extractor(task)
+        task_info = (
+            f"task_type={task.task_type} "
+            f"airflow_dag_id={task.dag_id} "
+            f"task_id={task.task_id} "
+            f"airflow_run_id={dagrun.run_id} "
+        )
+
+        if extractor:
+            # Extracting advanced metadata is only possible when extractor for 
particular operator
+            # is defined. Without it, we can't extract any input or output 
data.
+            try:
+                self.log.debug("Using extractor %s %s", 
extractor.__class__.__name__, str(task_info))
+                if complete:
+                    task_metadata = 
extractor.extract_on_complete(task_instance)
+                else:
+                    task_metadata = extractor.extract()
+
+                self.log.debug("Found task metadata for operation %s: %s", 
task.task_id, str(task_metadata))
+                if task_metadata:
+                    if (not task_metadata.inputs) and (not 
task_metadata.outputs):
+                        inlets = task.get_inlet_defs()
+                        outlets = task.get_outlet_defs()
+                        self.extract_inlets_and_outlets(task_metadata, inlets, 
outlets)
+
+                    return task_metadata
+
+            except Exception as e:
+                self.log.exception(
+                    "Failed to extract metadata using found extractor %s - %s 
%s", extractor, e, task_info
+                )
+        else:
+            self.log.debug("Unable to find an extractor %s", task_info)
+
+            # Only include the unkonwnSourceAttribute facet if there is no 
extractor
+            task_metadata = OperatorLineage(
+                run_facets={
+                    "unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
+                        unknownItems=[
+                            UnknownOperatorInstance(
+                                name=get_operator_class(task).__name__,
+                                properties={attr: value for attr, value in 
task.__dict__.items()},
+                            )
+                        ]
+                    )
+                },
+            )
+            inlets = task.get_inlet_defs()
+            outlets = task.get_outlet_defs()
+            self.extract_inlets_and_outlets(task_metadata, inlets, outlets)
+            return task_metadata
+
+        return OperatorLineage()
+
+    def _get_extractor_class(self, clazz: type) -> type[BaseExtractor] | None:
+        name = clazz.__name__
+        if name in self.extractors:
+            return self.extractors[name]
+
+        def method_exists(method_name):
+            method = getattr(clazz, method_name, None)
+            if method:
+                return callable(method)
+
+        if method_exists("get_openlineage_facets_on_start") or method_exists(
+            "get_openlineage_facets_on_complete"
+        ):
+            return self.default_extractor
+        return None
+
+    def _get_extractor(self, task) -> BaseExtractor | None:
+        # TODO: Re-enable in Extractor PR
+        # self.instantiate_abstract_extractors(task)
+        extractor = self._get_extractor_class(get_operator_class(task))
+        self.log.debug("extractor for %s is %s", task.__class__, extractor)

Review Comment:
   `get_operator_class` isn't needed, `task.task_type` should be used, so 
`_get_extractor_class` should take a classname, not a class.



##########
airflow/providers/openlineage/utils/converters.py:
##########
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.lineage.entities import Table
+from openlineage.client.run import Dataset
+
+
+def convert_to_ol_dataset(obj):

Review Comment:
   This is currently(?) only called from extractors.manager -- could this be a 
method on there instead?



##########
airflow/providers/openlineage/extractors/base.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+from attrs import Factory, define
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from openlineage.client.facet import BaseFacet
+from openlineage.client.run import Dataset
+
+
+@define
+class OperatorLineage:
+    """Structure returned from lineage extraction."""
+
+    inputs: list[Dataset] = Factory(list)
+    outputs: list[Dataset] = Factory(list)
+    run_facets: dict[str, BaseFacet] = Factory(dict)
+    job_facets: dict[str, BaseFacet] = Factory(dict)
+
+
+class BaseExtractor(ABC, LoggingMixin):
+    """
+    Abstract base extractor class.
+
+    This is used mostly to maintain support for custom extractors.
+    """
+
+    _allowed_query_params: list[str] = []
+
+    def __init__(self, operator):
+        super().__init__()
+        self.operator = operator
+        self.patch()
+
+    def patch(self):
+        # Extractor should register extension methods or patches to operator 
here
+        pass
+
+    @classmethod
+    @abstractmethod
+    def get_operator_classnames(cls) -> list[str]:
+        """
+        Implement this method returning list of operators that extractor works 
for.
+        Particularly, in Airflow 2 some operators are deprecated and simply 
subclass the new
+        implementation, for example BigQueryOperator:
+        
https://github.com/apache/airflow/blob/main/airflow/contrib/operators/bigquery_operator.py

Review Comment:
   Probably shouldn't link to `main` branch here, but a short ref sha else this 
link will not point to the right place in future



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):
+        return BaseHook.get_connection(conn_id=conn_id)
+    return None
+
+
+def get_job_name(task):
+    return f"{task.dag_id}.{task.task_id}"

Review Comment:
   What about mapped tasks? Need to be handled here or not?



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):

Review Comment:
   This seems very broad? Where is this used?



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:

Review Comment:
   Does any version of airflow have `task.file_path`? 🤔 



##########
airflow/providers/openlineage/provider.yaml:
##########
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-openlineage
+name: OpenLineage Airflow
+description: |
+  `OpenLineage <https://openlineage.io/>`__
+
+versions:
+  - 1.0.0
+
+dependencies:
+  - apache-airflow>=2.5.1
+  - apache-airflow-providers-common-sql>=1.3.1
+  - attrs>=22.2
+  - openlineage-integration-common>=0.20.6
+  - openlineage-python>=0.20.6
+
+integrations:
+  - integration-name: OpenLineage
+    external-doc-url: https://openlineage.io
+    logo: /integration-logos/openlineage/openlineage.svg
+    tags: [apache]

Review Comment:
   I don't think this should be `apache`, as OpenLineage is not an ASF project



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):
+        return BaseHook.get_connection(conn_id=conn_id)
+    return None
+
+
+def get_job_name(task):
+    return f"{task.dag_id}.{task.task_id}"
+
+
+def get_custom_facets(
+    dagrun, task, is_external_trigger: bool, task_instance: TaskInstance | 
None = None
+) -> dict[str, Any]:
+    custom_facets = {
+        "airflow_runArgs": AirflowRunArgsRunFacet(is_external_trigger),
+        "airflow_version": AirflowVersionRunFacet.from_dagrun_and_task(dagrun, 
task),

Review Comment:
   I guess this is what OL expects? But the name of the facet doesn't match my 
expectations of what is contained in here



##########
tests/providers/openlineage/conftest.py:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Copyright 2018-2023 contributors to the OpenLineage project
+# SPDX-License-Identifier: Apache-2.0
+from __future__ import annotations
+
+import logging
+import os
+from unittest.mock import patch
+
+import pytest
+
+log = logging.getLogger(__name__)
+
+
[email protected](scope="function")
+def remove_redshift_conn():
+    if "REDSHIFT_CONN" in os.environ:
+        del os.environ["REDSHIFT_CONN"]
+    if "WRITE_SCHEMA" in os.environ:
+        del os.environ["WRITE_SCHEMA"]
+
+
[email protected](scope="function")
+def we_module_env():
+    os.environ["REDSHIFT_CONN"] = "postgresql://user:[email protected]:1234/db"
+    os.environ["WRITE_SCHEMA"] = "testing"

Review Comment:
   I think this will work, and it'll automatically get cleared after the test 
function finishes.
   
   ```suggestion
   @pytest.fixture(scope="function")
   def we_module_env(monkeypatch):
       monkeypatch.setenv("REDSHIFT_CONN", 
"postgresql://user:[email protected]:1234/db")
       monkeypatch.setenv("WRITE_SCHEMA", "testing")
   ```



##########
airflow/providers/openlineage/plugins/listener.py:
##########
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from concurrent.futures import Executor, ThreadPoolExecutor
+from typing import TYPE_CHECKING
+
+from airflow.listeners import hookimpl
+from airflow.providers.openlineage.extractors import ExtractorManager
+from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
+from airflow.providers.openlineage.utils import (
+    get_airflow_run_facet,
+    get_custom_facets,
+    get_job_name,
+    get_task_location,
+    print_exception,
+)
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+    from airflow.models import DagRun, TaskInstance
+
+
+class OpenLineageListener:
+    """
+    OpenLineage listener
+    Sends events on task instance and dag run starts, completes and failures.
+    """
+
+    def __init__(self):
+        self.log = logging.getLogger(__name__)
+        self.executor: Executor = None  # type: ignore
+        self.extractor_manager = ExtractorManager()
+        self.adapter = OpenLineageAdapter()
+
+    @hookimpl
+    def on_task_instance_running(
+        self, previous_state, task_instance: TaskInstance, session: Session  # 
This will always be QUEUED
+    ):
+        if not hasattr(task_instance, "task"):
+            self.log.warning(
+                f"No task set for TI object task_id: {task_instance.task_id} - 
"
+                f"dag_id: {task_instance.dag_id} - run_id 
{task_instance.run_id}"
+            )
+            return
+
+        self.log.debug("OpenLineage listener got notification about task 
instance start")
+        dagrun = task_instance.dag_run
+        task = task_instance.task
+        dag = task.dag
+
+        @print_exception
+        def on_running():
+            # that's a workaround to detect task running from deferred state
+            # we return here because Airflow 2.3 needs task from deferred state
+            if task_instance.next_method is not None:
+                return
+            parent_run_id = self.adapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+
+            task_uuid = self.adapter.build_task_instance_run_id(
+                task.task_id, task_instance.execution_date, 
task_instance.try_number
+            )
+
+            task_metadata = self.extractor_manager.extract_metadata(dagrun, 
task)
+
+            self.adapter.start_task(
+                run_id=task_uuid,
+                job_name=get_job_name(task),
+                job_description=dag.description,
+                event_time=task_instance.start_date.isoformat(),
+                parent_job_name=dag.dag_id,
+                parent_run_id=parent_run_id,
+                code_location=get_task_location(task),
+                nominal_start_time=dagrun.data_interval_start.isoformat(),
+                nominal_end_time=dagrun.data_interval_end.isoformat(),
+                owners=dag.owner.split(", "),
+                task=task_metadata,
+                run_facets={
+                    **task_metadata.run_facets,
+                    **get_custom_facets(dagrun, task, dagrun.external_trigger, 
task_instance),
+                    **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                },
+            )
+
+        self.executor.submit(on_running)
+
+    @hookimpl
+    def on_task_instance_success(self, previous_state, task_instance: 
TaskInstance, session):
+        self.log.debug("OpenLineage listener got notification about task 
instance success")
+
+        dagrun = task_instance.dag_run
+        task = task_instance.task
+
+        task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+            task.task_id, task_instance.execution_date, 
task_instance.try_number - 1
+        )
+
+        @print_exception
+        def on_success():
+            task_metadata = self.extractor_manager.extract_metadata(
+                dagrun, task, complete=True, task_instance=task_instance
+            )
+            self.adapter.complete_task(
+                run_id=task_uuid,
+                job_name=get_job_name(task),
+                end_time=task_instance.end_date.isoformat(),
+                task=task_metadata,
+            )
+
+        self.executor.submit(on_success)
+
+    @hookimpl
+    def on_task_instance_failed(self, previous_state, task_instance: 
TaskInstance, session):
+        self.log.debug("OpenLineage listener got notification about task 
instance failure")
+
+        dagrun = task_instance.dag_run
+        task = task_instance.task
+
+        task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+            task.task_id, task_instance.execution_date, 
task_instance.try_number - 1
+        )
+
+        @print_exception
+        def on_failure():
+            task_metadata = self.extractor_manager.extract_metadata(
+                dagrun, task, complete=True, task_instance=task_instance
+            )
+
+            self.adapter.fail_task(
+                run_id=task_uuid,
+                job_name=get_job_name(task),
+                end_time=task_instance.end_date.isoformat(),
+                task=task_metadata,
+            )
+
+        self.executor.submit(on_failure)
+
+    @hookimpl
+    def on_starting(self, component):
+        self.log.debug("on_starting: %s", component.__class__.__name__)
+        self.executor = ThreadPoolExecutor(max_workers=8, 
thread_name_prefix="openlineage_")
+
+    @hookimpl
+    def before_stopping(self, component):
+        self.log.debug("before_stopping: %s", component.__class__.__name__)
+        self.executor.shutdown(wait=True)

Review Comment:
   Yes please. 5s? 30s?



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):
+        return BaseHook.get_connection(conn_id=conn_id)
+    return None
+
+
+def get_job_name(task):
+    return f"{task.dag_id}.{task.task_id}"
+
+
+def get_custom_facets(
+    dagrun, task, is_external_trigger: bool, task_instance: TaskInstance | 
None = None
+) -> dict[str, Any]:
+    custom_facets = {
+        "airflow_runArgs": AirflowRunArgsRunFacet(is_external_trigger),
+        "airflow_version": AirflowVersionRunFacet.from_dagrun_and_task(dagrun, 
task),
+    }
+    # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
+    # this comes from Airflow code
+    if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
+        custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+    return custom_facets
+
+
+class InfoJsonEncodable(dict):
+    """
+    Airflow objects might not be json-encodable overall.
+
+    The class provides additional attributes to control
+    what and how is encoded:
+    * renames: a dictionary of attribute name changes
+    * casts: a dictionary consisting of attribute names
+             and corresponding methods that should change
+             object value
+    * includes: list of attributes to be included in encoding
+    * excludes: list of attributes to be excluded from encoding
+
+    Don't use both includes and excludes.
+    """
+
+    renames: dict[str, str] = {}
+    casts: dict[str, Any] = {}
+    includes: list[str] = []
+    excludes: list[str] = []
+
+    def __init__(self, obj):
+        self.obj = obj
+        self._fields = []
+
+        self._cast_fields()
+        self._rename_fields()
+        self._include_fields()
+        dict.__init__(
+            self,
+            **{field: InfoJsonEncodable._cast_basic_types(getattr(self, 
field)) for field in self._fields},
+        )
+
+    @staticmethod
+    def _cast_basic_types(value):
+        if isinstance(value, datetime.datetime):
+            return value.isoformat()
+        if isinstance(value, (set, list, tuple)):
+            return str(list(value))
+        return value
+
+    def _rename_fields(self):
+        for field, renamed in self.renames.items():
+            if hasattr(self.obj, field):
+                setattr(self, renamed, getattr(self.obj, field))
+                self._fields.append(renamed)
+
+    def _cast_fields(self):
+        for field, func in self.casts.items():
+            setattr(self, field, func(self.obj))
+            self._fields.append(field)
+
+    def _include_fields(self):
+        if self.includes and self.excludes:
+            raise Exception("Don't use both includes and excludes.")
+        if self.includes:
+            for field in self.includes:
+                if field in self._fields or not hasattr(self.obj, field):
+                    continue
+                setattr(self, field, getattr(self.obj, field))
+                self._fields.append(field)
+        else:
+            for field, val in self.obj.__dict__.items():
+                if field in self._fields or field in self.excludes or field in 
self.renames:
+                    continue
+                setattr(self, field, val)
+                self._fields.append(field)
+
+
+class DagInfo(InfoJsonEncodable):
+    """Defines encoding DAG object to JSON."""
+
+    includes = ["dag_id", "schedule_interval", "tags", "start_date"]
+    casts = {"timetable": lambda dag: dag.timetable.serialize() if 
getattr(dag, "timetable", None) else None}
+    renames = {"_dag_id": "dag_id"}
+
+
+class DagRunInfo(InfoJsonEncodable):
+    """Defines encoding DagRun object to JSON."""
+
+    includes = [
+        "conf",
+        "dag_id",
+        "data_interval_start",
+        "data_interval_end",
+        "external_trigger",
+        "run_id",
+        "run_type",
+        "start_date",
+    ]
+
+
+class TaskInstanceInfo(InfoJsonEncodable):
+    """Defines encoding TaskInstance object to JSON."""
+
+    includes = ["duration", "try_number", "pool"]
+    casts = {
+        "map_index": lambda ti: ti.map_index
+        if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
+        else None
+    }
+
+
+class TaskInfo(InfoJsonEncodable):
+    """Defines encoding BaseOperator/AbstractOperator object to JSON."""
+
+    renames = {
+        "_BaseOperator__init_kwargs": "args",
+        "_BaseOperator__from_mapped": "mapped",
+        "_downstream_task_ids": "downstream_task_ids",
+        "_upstream_task_ids": "upstream_task_ids",
+    }
+    excludes = [
+        "_BaseOperator__instantiated",
+        "_dag",
+        "_hook",
+        "_log",
+        "_outlets",
+        "_inlets",
+        "_lock_for_execution",
+        "handler",
+        "params",
+        "python_callable",
+        "retry_delay",
+    ]
+    casts = {
+        "operator_class": lambda task: 
f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",  
# noqa
+        "task_group": lambda task: TaskGroupInfo(task.task_group)
+        if hasattr(task, "task_group") and getattr(task.task_group, 
"_group_id", None)
+        else None,
+    }
+
+
+class TaskGroupInfo(InfoJsonEncodable):
+    """Defines encoding TaskGroup object to JSON."""
+
+    renames = {
+        "_group_id": "group_id",
+    }
+    includes = [
+        "downstream_group_ids",
+        "downstream_task_ids",
+        "prefix_group_id",
+        "tooltip",
+        "upstream_group_ids",
+        "upstream_task_ids",
+    ]
+
+
+def get_airflow_run_facet(
+    dag_run: DagRun,
+    dag: DAG,
+    task_instance: TaskInstance,
+    task: BaseOperator,
+    task_uuid: str,
+):
+    return {
+        "airflow": json.loads(
+            json.dumps(
+                asdict(
+                    AirflowRunFacet(
+                        dag=DagInfo(dag),
+                        dagRun=DagRunInfo(dag_run),
+                        taskInstance=TaskInstanceInfo(task_instance),
+                        task=TaskInfo(task),
+                        taskUuid=task_uuid,
+                    )
+                ),
+                default=str,
+            )
+        )
+    }
+
+
+def import_from_string(path: str):
+    try:
+        module_path, target = path.rsplit(".", 1)
+        module = importlib.import_module(module_path)
+        return getattr(module, target)
+    except Exception as e:
+        log.warning(e)
+        raise ImportError(f"Failed to import {path}") from e
+
+
+def try_import_from_string(path: str):
+    with suppress(ImportError):
+        return import_from_string(path)
+
+
+def redact_with_exclusions(source: Any):
+    """This function redacts sensitive data similar to SecretsMasker in 
Airflow logs.
+    The difference is that MAX_RECURSION_DEPTH is way higher - due to the 
structure of OL events
+    we need more depth.
+    Also, we allow data structures to specify data that needs not to be 
redacted by specifying
+    _skip_redact list.
+    """
+    try:
+        from airflow.utils.log.secrets_masker import (
+            _secrets_masker,
+            should_hide_value_for_key,
+        )
+    except ImportError:
+        return source
+
+    sm = _secrets_masker()
+    MAX_RECURSION_DEPTH = 20
+
+    def _redact(item, name: str | None, depth: int):
+        if depth > MAX_RECURSION_DEPTH:
+            return item
+        try:
+            if name and should_hide_value_for_key(name):
+                return sm._redact_all(item, depth)
+            if isinstance(item, dict):
+                return {
+                    dict_key: _redact(subval, name=dict_key, depth=(depth + 1))
+                    for dict_key, subval in item.items()
+                }
+            elif attrs.has(item.__class__) or (is_json_serializable(item) and 
hasattr(item, "__dict__")):
+                for dict_key, subval in item.__dict__.items():

Review Comment:
   Attrs classes are not always going to have a `__dict__` -- does this matter?



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+import importlib
+import json
+import logging
+import os
+import subprocess
+from contextlib import suppress
+from functools import wraps
+from typing import TYPE_CHECKING, Any
+from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
+
+import attrs
+from attrs import asdict
+
+from airflow.models import DAG as AIRFLOW_DAG
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowMappedTaskRunFacet,
+    AirflowRunArgsRunFacet,
+    AirflowRunFacet,
+    AirflowVersionRunFacet,
+)
+
+# TODO: move this maybe to Airflow's logic?
+from openlineage.client.utils import RedactMixin
+
+if TYPE_CHECKING:
+    from airflow.models import DAG, BaseOperator, Connection, DagRun, 
TaskInstance
+
+
+log = logging.getLogger(__name__)
+_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
+
+
+def openlineage_job_name(dag_id: str, task_id: str) -> str:
+    return f"{dag_id}.{task_id}"
+
+
+def get_operator_class(task: BaseOperator) -> type:
+    if task.__class__.__name__ in ("DecoratedMappedOperator", 
"MappedOperator"):
+        return task.operator_class
+    return task.__class__
+
+
+def to_json_encodable(task: BaseOperator) -> dict[str, object]:
+    def _task_encoder(obj):
+        if isinstance(obj, datetime.datetime):
+            return obj.isoformat()
+        elif isinstance(obj, AIRFLOW_DAG):
+            return {
+                "dag_id": obj.dag_id,
+                "tags": obj.tags,
+                "schedule_interval": obj.schedule_interval,
+            }
+        else:
+            return str(obj)
+
+    return json.loads(json.dumps(task.__dict__, default=_task_encoder))
+
+
+def url_to_https(url) -> str | None:
+    # Ensure URL exists
+    if not url:
+        return None
+
+    base_url = None
+    if url.startswith("git@"):
+        part = url.split("git@")[1:2]
+        if part:
+            base_url = f'https://{part[0].replace(":", "/", 1)}'
+    elif url.startswith("https://";):
+        base_url = url
+
+    if not base_url:
+        raise ValueError(f"Unable to extract location from: {url}")
+
+    if base_url.endswith(".git"):
+        base_url = base_url[:-4]
+    return base_url
+
+
+def get_location(file_path) -> str | None:
+    # Ensure file path exists
+    if not file_path:
+        return None
+
+    # move to the file directory
+    abs_path = os.path.abspath(file_path)
+    file_name = os.path.basename(file_path)
+    cwd = os.path.dirname(abs_path)
+
+    # get the repo url
+    repo_url = execute_git(cwd, ["config", "--get", "remote.origin.url"])
+
+    # get the repo relative path
+    repo_relative_path = execute_git(cwd, ["rev-parse", "--show-prefix"])
+
+    # get the commitId for the particular file
+    commit_id = execute_git(cwd, ["rev-list", "HEAD", "-1", "--", file_name])
+
+    # build the URL
+    base_url = url_to_https(repo_url)
+    if not base_url:
+        return None
+
+    return f"{base_url}/blob/{commit_id}/{repo_relative_path}{file_name}"
+
+
+def get_task_location(task):
+    try:
+        if hasattr(task, "file_path") and task.file_path:
+            return get_location(task.file_path)
+        else:
+            return get_location(task.dag.fileloc)
+    except Exception:
+        return None
+
+
+def execute_git(cwd, params):
+    p = subprocess.Popen(["git"] + params, cwd=cwd, stdout=subprocess.PIPE, 
stderr=None)
+    p.wait(timeout=0.5)
+    out, err = p.communicate()
+    return out.decode("utf8").strip()
+
+
+def redacted_connection_uri(conn: Connection, filtered_params=None, 
filtered_prefixes=None):
+    """
+    Return the connection URI for the given Connection.
+    This method additionally filters URI by removing query parameters that are 
known to carry sensitive data
+    like username, password, access key.
+    """
+    if filtered_prefixes is None:
+        filtered_prefixes = []
+    if filtered_params is None:
+        filtered_params = []
+
+    def filter_key_params(k: str):
+        return k not in filtered_params and any(substr in k for substr in 
filtered_prefixes)
+
+    conn_uri = conn.get_uri()
+    parsed = urlparse(conn_uri)
+
+    # Remove username and password
+    netloc = f"{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
+    parsed = parsed._replace(netloc=netloc)
+    if parsed.query:
+        query_dict = dict(parse_qsl(parsed.query))
+        if conn.EXTRA_KEY in query_dict:
+            query_dict = json.loads(query_dict[conn.EXTRA_KEY])
+        filtered_qs = {k: v for k, v in query_dict.items() if not 
filter_key_params(k)}
+        parsed = parsed._replace(query=urlencode(filtered_qs))
+    return urlunparse(parsed)
+
+
+def get_normalized_postgres_connection_uri(conn):
+    """
+    URIs starting with postgresql:// and postgres:// are both valid
+    PostgreSQL connection strings. This function normalizes it to
+    postgres:// as canonical name according to OpenLineage spec.
+    """
+    uri = redacted_connection_uri(conn)
+    if uri.startswith("postgresql"):
+        uri = uri.replace("postgresql", "postgres", 1)
+    return uri
+
+
+def get_connection(conn_id) -> Connection | None:
+    from airflow.hooks.base import BaseHook
+
+    with suppress(Exception):
+        return BaseHook.get_connection(conn_id=conn_id)
+    return None
+
+
+def get_job_name(task):
+    return f"{task.dag_id}.{task.task_id}"
+
+
+def get_custom_facets(
+    dagrun, task, is_external_trigger: bool, task_instance: TaskInstance | 
None = None
+) -> dict[str, Any]:
+    custom_facets = {
+        "airflow_runArgs": AirflowRunArgsRunFacet(is_external_trigger),
+        "airflow_version": AirflowVersionRunFacet.from_dagrun_and_task(dagrun, 
task),
+    }
+    # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
+    # this comes from Airflow code
+    if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
+        custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+    return custom_facets
+
+
+class InfoJsonEncodable(dict):
+    """
+    Airflow objects might not be json-encodable overall.
+
+    The class provides additional attributes to control
+    what and how is encoded:
+    * renames: a dictionary of attribute name changes
+    * casts: a dictionary consisting of attribute names
+             and corresponding methods that should change
+             object value
+    * includes: list of attributes to be included in encoding
+    * excludes: list of attributes to be excluded from encoding
+
+    Don't use both includes and excludes.
+    """
+
+    renames: dict[str, str] = {}
+    casts: dict[str, Any] = {}
+    includes: list[str] = []
+    excludes: list[str] = []
+
+    def __init__(self, obj):
+        self.obj = obj
+        self._fields = []
+
+        self._cast_fields()
+        self._rename_fields()
+        self._include_fields()
+        dict.__init__(
+            self,
+            **{field: InfoJsonEncodable._cast_basic_types(getattr(self, 
field)) for field in self._fields},
+        )
+
+    @staticmethod
+    def _cast_basic_types(value):
+        if isinstance(value, datetime.datetime):
+            return value.isoformat()
+        if isinstance(value, (set, list, tuple)):
+            return str(list(value))
+        return value
+
+    def _rename_fields(self):
+        for field, renamed in self.renames.items():
+            if hasattr(self.obj, field):
+                setattr(self, renamed, getattr(self.obj, field))
+                self._fields.append(renamed)
+
+    def _cast_fields(self):
+        for field, func in self.casts.items():
+            setattr(self, field, func(self.obj))
+            self._fields.append(field)
+
+    def _include_fields(self):
+        if self.includes and self.excludes:
+            raise Exception("Don't use both includes and excludes.")
+        if self.includes:
+            for field in self.includes:
+                if field in self._fields or not hasattr(self.obj, field):
+                    continue
+                setattr(self, field, getattr(self.obj, field))
+                self._fields.append(field)
+        else:
+            for field, val in self.obj.__dict__.items():
+                if field in self._fields or field in self.excludes or field in 
self.renames:
+                    continue
+                setattr(self, field, val)
+                self._fields.append(field)
+
+
+class DagInfo(InfoJsonEncodable):
+    """Defines encoding DAG object to JSON."""
+
+    includes = ["dag_id", "schedule_interval", "tags", "start_date"]
+    casts = {"timetable": lambda dag: dag.timetable.serialize() if 
getattr(dag, "timetable", None) else None}
+    renames = {"_dag_id": "dag_id"}
+
+
+class DagRunInfo(InfoJsonEncodable):
+    """Defines encoding DagRun object to JSON."""
+
+    includes = [
+        "conf",
+        "dag_id",
+        "data_interval_start",
+        "data_interval_end",
+        "external_trigger",
+        "run_id",
+        "run_type",
+        "start_date",
+    ]
+
+
+class TaskInstanceInfo(InfoJsonEncodable):
+    """Defines encoding TaskInstance object to JSON."""
+
+    includes = ["duration", "try_number", "pool"]
+    casts = {
+        "map_index": lambda ti: ti.map_index
+        if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1
+        else None
+    }
+
+
+class TaskInfo(InfoJsonEncodable):
+    """Defines encoding BaseOperator/AbstractOperator object to JSON."""
+
+    renames = {
+        "_BaseOperator__init_kwargs": "args",
+        "_BaseOperator__from_mapped": "mapped",
+        "_downstream_task_ids": "downstream_task_ids",
+        "_upstream_task_ids": "upstream_task_ids",
+    }
+    excludes = [
+        "_BaseOperator__instantiated",
+        "_dag",
+        "_hook",
+        "_log",
+        "_outlets",
+        "_inlets",
+        "_lock_for_execution",
+        "handler",
+        "params",
+        "python_callable",
+        "retry_delay",
+    ]
+    casts = {
+        "operator_class": lambda task: 
f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",  
# noqa
+        "task_group": lambda task: TaskGroupInfo(task.task_group)
+        if hasattr(task, "task_group") and getattr(task.task_group, 
"_group_id", None)
+        else None,
+    }
+
+
+class TaskGroupInfo(InfoJsonEncodable):
+    """Defines encoding TaskGroup object to JSON."""
+
+    renames = {
+        "_group_id": "group_id",
+    }
+    includes = [
+        "downstream_group_ids",
+        "downstream_task_ids",
+        "prefix_group_id",
+        "tooltip",
+        "upstream_group_ids",
+        "upstream_task_ids",
+    ]
+
+
+def get_airflow_run_facet(
+    dag_run: DagRun,
+    dag: DAG,
+    task_instance: TaskInstance,
+    task: BaseOperator,
+    task_uuid: str,
+):
+    return {
+        "airflow": json.loads(
+            json.dumps(
+                asdict(
+                    AirflowRunFacet(
+                        dag=DagInfo(dag),
+                        dagRun=DagRunInfo(dag_run),
+                        taskInstance=TaskInstanceInfo(task_instance),
+                        task=TaskInfo(task),
+                        taskUuid=task_uuid,
+                    )
+                ),
+                default=str,
+            )
+        )
+    }
+
+
+def import_from_string(path: str):
+    try:
+        module_path, target = path.rsplit(".", 1)
+        module = importlib.import_module(module_path)
+        return getattr(module, target)
+    except Exception as e:
+        log.warning(e)
+        raise ImportError(f"Failed to import {path}") from e
+
+
+def try_import_from_string(path: str):
+    with suppress(ImportError):
+        return import_from_string(path)
+
+
+def redact_with_exclusions(source: Any):
+    """This function redacts sensitive data similar to SecretsMasker in 
Airflow logs.
+    The difference is that MAX_RECURSION_DEPTH is way higher - due to the 
structure of OL events
+    we need more depth.
+    Also, we allow data structures to specify data that needs not to be 
redacted by specifying
+    _skip_redact list.
+    """
+    try:
+        from airflow.utils.log.secrets_masker import (
+            _secrets_masker,
+            should_hide_value_for_key,
+        )
+    except ImportError:
+        return source
+
+    sm = _secrets_masker()
+    MAX_RECURSION_DEPTH = 20
+
+    def _redact(item, name: str | None, depth: int):
+        if depth > MAX_RECURSION_DEPTH:
+            return item

Review Comment:
   I'd really love to find a way that doesn't end up with a second copy of this 
code -- espeically given this is a security-like control.



##########
tests/providers/openlineage/conftest.py:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Copyright 2018-2023 contributors to the OpenLineage project
+# SPDX-License-Identifier: Apache-2.0
+from __future__ import annotations
+
+import logging
+import os
+from unittest.mock import patch
+
+import pytest
+
+log = logging.getLogger(__name__)
+
+
[email protected](scope="function")
+def remove_redshift_conn():
+    if "REDSHIFT_CONN" in os.environ:
+        del os.environ["REDSHIFT_CONN"]
+    if "WRITE_SCHEMA" in os.environ:
+        del os.environ["WRITE_SCHEMA"]
+
+
[email protected](scope="function")
+def we_module_env():
+    os.environ["REDSHIFT_CONN"] = "postgresql://user:[email protected]:1234/db"
+    os.environ["WRITE_SCHEMA"] = "testing"
+
+
[email protected](scope="function")
+def dagbag():
+    log.debug("dagbag()")
+    os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = "sqlite://"
+    os.environ["OPENLINEAGE_NAMESPACE"] = "test-marquez"
+
+    import airflow.utils.db as db_utils
+    from airflow import settings
+
+    db_utils.resetdb(settings.RBAC)

Review Comment:
   This function is defined as
   ```
   def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):
   ```
   
   `settings.RBAC` hasn't existed for a long time either. (like Since 2.0?!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to