This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ef0ed1aacc Fix local OpenLineage import in `SQLExecuteQueryOperator`.
(#32400)
ef0ed1aacc is described below
commit ef0ed1aacc208be9e52a35211d2beaefb735173a
Author: JDarDagran <[email protected]>
AuthorDate: Thu Jul 6 14:20:25 2023 +0200
Fix local OpenLineage import in `SQLExecuteQueryOperator`. (#32400)
Signed-off-by: Jakub Dardzinski <[email protected]>
---
airflow/providers/common/sql/operators/sql.py | 6 +++---
tests/providers/common/sql/operators/test_sql_execute.py | 15 +++++++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/common/sql/operators/sql.py
b/airflow/providers/common/sql/operators/sql.py
index 53b499f506..f1784f618e 100644
--- a/airflow/providers/common/sql/operators/sql.py
+++ b/airflow/providers/common/sql/operators/sql.py
@@ -330,12 +330,12 @@ class SQLExecuteQueryOperator(BaseSQLOperator):
return operator_lineage
def get_openlineage_facets_on_complete(self, task_instance) ->
OperatorLineage | None:
- operator_lineage = self.get_openlineage_facets_on_start() or
OperatorLineage()
-
try:
from airflow.providers.openlineage.extractors import
OperatorLineage
except ImportError:
- return operator_lineage
+ return None
+
+ operator_lineage = self.get_openlineage_facets_on_start() or
OperatorLineage()
hook = self.get_db_hook()
try:
diff --git a/tests/providers/common/sql/operators/test_sql_execute.py
b/tests/providers/common/sql/operators/test_sql_execute.py
index 48f59e0ffe..ddd1372485 100644
--- a/tests/providers/common/sql/operators/test_sql_execute.py
+++ b/tests/providers/common/sql/operators/test_sql_execute.py
@@ -18,6 +18,7 @@
from __future__ import annotations
from typing import Any, NamedTuple, Sequence
+from unittest import mock
from unittest.mock import MagicMock
import pytest
@@ -354,3 +355,17 @@ FORGOT TO COMMENT"""
)
== lineage_on_complete
)
+
+
+def test_with_no_openlineage_provider():
+ import importlib
+
+ def mock__import__(name, globals_=None, locals_=None, fromlist=(),
level=0):
+ if level == 0 and name.startswith("airflow.providers.openlineage"):
+ raise ImportError("No provider
'apache-airflow-providers-openlineage'")
+ return importlib.__import__(name, globals=globals_, locals=locals_,
fromlist=fromlist, level=level)
+
+ with mock.patch("builtins.__import__", side_effect=mock__import__):
+ op = SQLExecuteQueryOperator(task_id=TASK_ID, sql="SELECT 1;")
+ assert op.get_openlineage_facets_on_start() is None
+ assert op.get_openlineage_facets_on_complete(None) is None