This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fe1be409573 fix: OpenLineage BaseExtractor's on_failure should call
on_complete by default (#48456)
fe1be409573 is described below
commit fe1be4095736244f2f567ec1cd3c4063fb1e87fd
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Mar 27 19:14:59 2025 +0100
fix: OpenLineage BaseExtractor's on_failure should call on_complete by
default (#48456)
---
.../providers/openlineage/extractors/base.py | 2 +-
.../tests/unit/openlineage/extractors/test_base.py | 83 ++++++++++++++++++++--
2 files changed, 79 insertions(+), 6 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
index b5f8a93f20d..435a365e804 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
@@ -84,7 +84,7 @@ class BaseExtractor(ABC, LoggingMixin):
return self.extract()
def extract_on_failure(self, task_instance) -> OperatorLineage | None:
- return self.extract()
+ return self.extract_on_complete(task_instance)
class DefaultExtractor(BaseExtractor):
diff --git
a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
index a120537a414..eb247567511 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
@@ -65,10 +65,54 @@ FINISHED_FACETS: dict[str, JobFacet] = {"complete":
CompleteRunFacet(True)}
FAILED_FACETS: dict[str, JobFacet] = {"failure": FailRunFacet(True)}
-class ExampleExtractor(BaseExtractor):
+class ExtractorWithoutExecuteOnFailure(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
- return ["OperatorWithoutFailure"]
+ return ["SimpleCustomOperator"]
+
+ def _execute_extraction(self) -> OperatorLineage | None:
+ return OperatorLineage(
+ inputs=INPUTS,
+ outputs=OUTPUTS,
+ run_facets=RUN_FACETS,
+ job_facets=JOB_FACETS,
+ )
+
+ def extract_on_complete(self, task_instance) -> OperatorLineage | None:
+ return OperatorLineage(
+ inputs=INPUTS,
+ outputs=OUTPUTS,
+ run_facets=RUN_FACETS,
+ job_facets=FINISHED_FACETS,
+ )
+
+
+class ExtractorWithExecuteExtractionOnly(BaseExtractor):
+ @classmethod
+ def get_operator_classnames(cls):
+ return ["AnotherOperator"]
+
+ def _execute_extraction(self) -> OperatorLineage | None:
+ return OperatorLineage(
+ inputs=INPUTS,
+ outputs=OUTPUTS,
+ run_facets=RUN_FACETS,
+ job_facets=JOB_FACETS,
+ )
+
+
+class SimpleCustomOperator(BaseOperator):
+ def execute(self, context) -> Any:
+ pass
+
+ def get_openlineage_facets_on_start(self) -> OperatorLineage:
+ return OperatorLineage()
+
+ def get_openlineage_facets_on_complete(self, task_instance) ->
OperatorLineage:
+ return OperatorLineage()
+
+ def get_openlineage_facets_on_failure(self, task_instance) ->
OperatorLineage:
+ return OperatorLineage()
class OperatorWithoutFailure(BaseOperator):
@@ -321,9 +365,38 @@ def
test_extractor_manager_calls_appropriate_extractor_method(
@mock.patch("airflow.providers.openlineage.conf.custom_extractors")
def test_extractors_env_var(custom_extractors):
- custom_extractors.return_value =
{"unit.openlineage.extractors.test_base.ExampleExtractor"}
- extractor =
ExtractorManager().get_extractor_class(OperatorWithoutFailure(task_id="example"))
- assert extractor is ExampleExtractor
+ custom_extractors.return_value = {
+
"unit.openlineage.extractors.test_base.ExtractorWithoutExecuteOnFailure"
+ }
+ extractor =
ExtractorManager().get_extractor_class(SimpleCustomOperator(task_id="example"))
+ assert extractor is ExtractorWithoutExecuteOnFailure
+
+
+def test_extractor_without_extract_on_failure_calls_extract_on_complete():
+ extractor =
ExtractorWithoutExecuteOnFailure(SimpleCustomOperator(task_id="example"))
+ result = extractor.extract_on_failure(None)
+ assert result == OperatorLineage(
+ inputs=INPUTS,
+ outputs=OUTPUTS,
+ run_facets=RUN_FACETS,
+ job_facets=FINISHED_FACETS,
+ )
+
+
+def
test_extractor_without_extract_on_complete_and_failure_always_calls_extract():
+ extractor =
ExtractorWithExecuteExtractionOnly(SimpleCustomOperator(task_id="example"))
+ expected_result = OperatorLineage(
+ inputs=INPUTS,
+ outputs=OUTPUTS,
+ run_facets=RUN_FACETS,
+ job_facets=JOB_FACETS,
+ )
+ result = extractor.extract_on_failure(None)
+ assert result == expected_result
+ result = extractor.extract_on_complete(None)
+ assert result == expected_result
+ result = extractor.extract()
+ assert result == expected_result
def test_does_not_use_default_extractor_when_not_a_method():