This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe1be409573 fix: OpenLineage BaseExtractor's on_failure should call 
on_complete by default (#48456)
fe1be409573 is described below

commit fe1be4095736244f2f567ec1cd3c4063fb1e87fd
Author: Kacper Muda <[email protected]>
AuthorDate: Thu Mar 27 19:14:59 2025 +0100

    fix: OpenLineage BaseExtractor's on_failure should call on_complete by 
default (#48456)
---
 .../providers/openlineage/extractors/base.py       |  2 +-
 .../tests/unit/openlineage/extractors/test_base.py | 83 ++++++++++++++++++++--
 2 files changed, 79 insertions(+), 6 deletions(-)

diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py 
b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
index b5f8a93f20d..435a365e804 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/extractors/base.py
@@ -84,7 +84,7 @@ class BaseExtractor(ABC, LoggingMixin):
         return self.extract()
 
     def extract_on_failure(self, task_instance) -> OperatorLineage | None:
-        return self.extract()
+        return self.extract_on_complete(task_instance)
 
 
 class DefaultExtractor(BaseExtractor):
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
index a120537a414..eb247567511 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_base.py
@@ -65,10 +65,54 @@ FINISHED_FACETS: dict[str, JobFacet] = {"complete": 
CompleteRunFacet(True)}
 FAILED_FACETS: dict[str, JobFacet] = {"failure": FailRunFacet(True)}
 
 
-class ExampleExtractor(BaseExtractor):
+class ExtractorWithoutExecuteOnFailure(BaseExtractor):
     @classmethod
     def get_operator_classnames(cls):
-        return ["OperatorWithoutFailure"]
+        return ["SimpleCustomOperator"]
+
+    def _execute_extraction(self) -> OperatorLineage | None:
+        return OperatorLineage(
+            inputs=INPUTS,
+            outputs=OUTPUTS,
+            run_facets=RUN_FACETS,
+            job_facets=JOB_FACETS,
+        )
+
+    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
+        return OperatorLineage(
+            inputs=INPUTS,
+            outputs=OUTPUTS,
+            run_facets=RUN_FACETS,
+            job_facets=FINISHED_FACETS,
+        )
+
+
+class ExtractorWithExecuteExtractionOnly(BaseExtractor):
+    @classmethod
+    def get_operator_classnames(cls):
+        return ["AnotherOperator"]
+
+    def _execute_extraction(self) -> OperatorLineage | None:
+        return OperatorLineage(
+            inputs=INPUTS,
+            outputs=OUTPUTS,
+            run_facets=RUN_FACETS,
+            job_facets=JOB_FACETS,
+        )
+
+
+class SimpleCustomOperator(BaseOperator):
+    def execute(self, context) -> Any:
+        pass
+
+    def get_openlineage_facets_on_start(self) -> OperatorLineage:
+        return OperatorLineage()
+
+    def get_openlineage_facets_on_complete(self, task_instance) -> 
OperatorLineage:
+        return OperatorLineage()
+
+    def get_openlineage_facets_on_failure(self, task_instance) -> 
OperatorLineage:
+        return OperatorLineage()
 
 
 class OperatorWithoutFailure(BaseOperator):
@@ -321,9 +365,38 @@ def 
test_extractor_manager_calls_appropriate_extractor_method(
 
 @mock.patch("airflow.providers.openlineage.conf.custom_extractors")
 def test_extractors_env_var(custom_extractors):
-    custom_extractors.return_value = 
{"unit.openlineage.extractors.test_base.ExampleExtractor"}
-    extractor = 
ExtractorManager().get_extractor_class(OperatorWithoutFailure(task_id="example"))
-    assert extractor is ExampleExtractor
+    custom_extractors.return_value = {
+        
"unit.openlineage.extractors.test_base.ExtractorWithoutExecuteOnFailure"
+    }
+    extractor = 
ExtractorManager().get_extractor_class(SimpleCustomOperator(task_id="example"))
+    assert extractor is ExtractorWithoutExecuteOnFailure
+
+
+def test_extractor_without_extract_on_failure_calls_extract_on_complete():
+    extractor = 
ExtractorWithoutExecuteOnFailure(SimpleCustomOperator(task_id="example"))
+    result = extractor.extract_on_failure(None)
+    assert result == OperatorLineage(
+        inputs=INPUTS,
+        outputs=OUTPUTS,
+        run_facets=RUN_FACETS,
+        job_facets=FINISHED_FACETS,
+    )
+
+
+def 
test_extractor_without_extract_on_complete_and_failure_always_calls_extract():
+    extractor = 
ExtractorWithExecuteExtractionOnly(SimpleCustomOperator(task_id="example"))
+    expected_result = OperatorLineage(
+        inputs=INPUTS,
+        outputs=OUTPUTS,
+        run_facets=RUN_FACETS,
+        job_facets=JOB_FACETS,
+    )
+    result = extractor.extract_on_failure(None)
+    assert result == expected_result
+    result = extractor.extract_on_complete(None)
+    assert result == expected_result
+    result = extractor.extract()
+    assert result == expected_result
 
 
 def test_does_not_use_default_extractor_when_not_a_method():

Reply via email to