This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5cb96f6de56 feature: Add Open Lineage support for
CloudDataFusionStartPipelineOperator (#56365)
5cb96f6de56 is described below
commit 5cb96f6de56f438b8355d309fd505d84f1079f33
Author: pawelgrochowicz <[email protected]>
AuthorDate: Thu Oct 9 12:15:19 2025 +0200
feature: Add Open Lineage support for CloudDataFusionStartPipelineOperator
(#56365)
* feature: Add OpenLineage support for CloudDataFusionStartPipelineOperator
* feature: Add OpenLineage support for CloudDataFusionStartPipelineOperator
---
.../cloud/openlineage/DataFusionRunFacet.json | 32 ++++++++++++
.../providers/google/cloud/openlineage/facets.py | 21 ++++++++
.../providers/google/cloud/operators/datafusion.py | 43 +++++++++++++---
.../unit/google/cloud/openlineage/test_facets.py | 8 +++
.../unit/google/cloud/operators/test_datafusion.py | 60 ++++++++++++++++++++++
5 files changed, 157 insertions(+), 7 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/openlineage/DataFusionRunFacet.json
b/providers/google/src/airflow/providers/google/cloud/openlineage/DataFusionRunFacet.json
new file mode 100644
index 00000000000..4409001d78d
--- /dev/null
+++
b/providers/google/src/airflow/providers/google/cloud/openlineage/DataFusionRunFacet.json
@@ -0,0 +1,32 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$defs": {
+ "DataFusionRunFacet": {
+ "allOf": [
+ {
+ "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
+ },
+ {
+ "type": "object",
+ "properties": {
+ "runId": {
+ "type": "string",
+ "description": "Pipeline run ID assigned by Cloud Data Fusion."
+ },
+ "runtimeArgs": {
+ "type": "object",
+ "description": "Runtime arguments provided when starting the
pipeline."
+ }
+ }
+ }
+ ],
+ "type": "object"
+ }
+ },
+ "type": "object",
+ "properties": {
+ "dataFusionRun": {
+ "$ref": "#/$defs/DataFusionRunFacet"
+ }
+ }
+}
diff --git
a/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py
b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py
index 645e8c51b07..443e9c1976e 100644
--- a/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py
+++ b/providers/google/src/airflow/providers/google/cloud/openlineage/facets.py
@@ -131,6 +131,26 @@ try:
"openlineage/CloudStorageTransferRunFacet.json"
)
+ @define
+ class DataFusionRunFacet(RunFacet):
+ """
+ Facet that represents relevant details of a Cloud Data Fusion pipeline
run.
+
+ :param runId: The pipeline execution id.
+ :param runtimeArgs: Runtime arguments passed to the pipeline.
+ """
+
+ runId: str | None = field(default=None)
+ runtimeArgs: dict[str, str] | None = field(default=None)
+
+ @staticmethod
+ def _get_schema() -> str:
+ return (
+ "https://raw.githubusercontent.com/apache/airflow/"
+
f"providers-google/{provider_version}/airflow/providers/google/"
+ "openlineage/DataFusionRunFacet.json"
+ )
+
except ImportError: # OpenLineage is not available
def create_no_op(*_, **__) -> None:
@@ -145,3 +165,4 @@ except ImportError: # OpenLineage is not available
BigQueryJobRunFacet = create_no_op # type: ignore[misc, assignment]
CloudStorageTransferJobFacet = create_no_op # type: ignore[misc,
assignment]
CloudStorageTransferRunFacet = create_no_op # type: ignore[misc,
assignment]
+ DataFusionRunFacet = create_no_op # type: ignore[misc, assignment]
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py
b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py
index 572faa35fac..cfe5e7965fb 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py
@@ -40,6 +40,7 @@ from airflow.providers.google.cloud.utils.helpers import
resource_path_to_dict
from airflow.providers.google.common.hooks.base_google import
PROVIDE_PROJECT_ID
if TYPE_CHECKING:
+ from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.context import Context
@@ -777,6 +778,7 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
self.pipeline_timeout = pipeline_timeout
self.deferrable = deferrable
self.poll_interval = poll_interval
+ self.pipeline_id: str | None = None
if success_states:
self.success_states = success_states
@@ -796,14 +798,14 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
project_id=self.project_id,
)
api_url = instance["apiEndpoint"]
- pipeline_id = hook.start_pipeline(
+ self.pipeline_id = hook.start_pipeline(
pipeline_name=self.pipeline_name,
pipeline_type=self.pipeline_type,
instance_url=api_url,
namespace=self.namespace,
runtime_args=self.runtime_args,
)
- self.log.info("Pipeline %s submitted successfully.", pipeline_id)
+ self.log.info("Pipeline %s submitted successfully.", self.pipeline_id)
DataFusionPipelineLink.persist(
context=context,
@@ -824,7 +826,7 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
namespace=self.namespace,
pipeline_name=self.pipeline_name,
pipeline_type=self.pipeline_type.value,
- pipeline_id=pipeline_id,
+ pipeline_id=self.pipeline_id,
poll_interval=self.poll_interval,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
@@ -834,19 +836,21 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
else:
if not self.asynchronous:
# when NOT using asynchronous mode it will just wait for
pipeline to finish and print message
- self.log.info("Waiting when pipeline %s will be in one of the
success states", pipeline_id)
+ self.log.info(
+ "Waiting when pipeline %s will be in one of the success
states", self.pipeline_id
+ )
hook.wait_for_pipeline_state(
success_states=self.success_states,
- pipeline_id=pipeline_id,
+ pipeline_id=self.pipeline_id,
pipeline_name=self.pipeline_name,
pipeline_type=self.pipeline_type,
namespace=self.namespace,
instance_url=api_url,
timeout=self.pipeline_timeout,
)
- self.log.info("Pipeline %s discovered success state.",
pipeline_id)
+ self.log.info("Pipeline %s discovered success state.",
self.pipeline_id)
# otherwise, return pipeline_id so that sensor can use it later
to check the pipeline state
- return pipeline_id
+ return self.pipeline_id
def execute_complete(self, context: Context, event: dict[str, Any]):
"""
@@ -863,6 +867,31 @@ class
CloudDataFusionStartPipelineOperator(GoogleCloudBaseOperator):
)
return event["pipeline_id"]
+ def get_openlineage_facets_on_complete(self, task_instance) ->
OperatorLineage | None:
+ """Build and return OpenLineage facets and datasets for the completed
pipeline start."""
+ from airflow.providers.common.compat.openlineage.facet import Dataset
+ from airflow.providers.google.cloud.openlineage.facets import
DataFusionRunFacet
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ pipeline_resource =
f"{self.project_id}:{self.location}:{self.instance_name}:{self.pipeline_name}"
+
+ inputs = [Dataset(namespace="datafusion", name=pipeline_resource)]
+
+ if self.pipeline_id:
+ output_name = f"{pipeline_resource}:{self.pipeline_id}"
+ else:
+ output_name = f"{pipeline_resource}:unknown"
+ outputs = [Dataset(namespace="datafusion", name=output_name)]
+
+ run_facets = {
+ "dataFusionRun": DataFusionRunFacet(
+ runId=self.pipeline_id,
+ runtimeArgs=self.runtime_args,
+ )
+ }
+
+ return OperatorLineage(inputs=inputs, outputs=outputs,
run_facets=run_facets, job_facets={})
+
class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):
"""
diff --git
a/providers/google/tests/unit/google/cloud/openlineage/test_facets.py
b/providers/google/tests/unit/google/cloud/openlineage/test_facets.py
index 0aa1e4feebc..be32909165f 100644
--- a/providers/google/tests/unit/google/cloud/openlineage/test_facets.py
+++ b/providers/google/tests/unit/google/cloud/openlineage/test_facets.py
@@ -20,6 +20,7 @@ from airflow.providers.google.cloud.openlineage.facets import
(
BigQueryJobRunFacet,
CloudStorageTransferJobFacet,
CloudStorageTransferRunFacet,
+ DataFusionRunFacet,
)
@@ -80,3 +81,10 @@ def test_cloud_storage_transfer_run_facet():
assert facet.timeout == 3600
assert facet.deferrable is False
assert facet.deleteJobAfterCompletion is True
+
+
+def test_datafusion_run_facet():
+ facet = DataFusionRunFacet(runId="abc123", runtimeArgs={"arg1": "val1"})
+
+ assert facet.runId == "abc123"
+ assert facet.runtimeArgs == {"arg1": "val1"}
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py
b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py
index 3aca8a37913..74cf94d1ce3 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py
@@ -23,6 +23,7 @@ import pytest
from airflow import DAG
from airflow.exceptions import AirflowException, TaskDeferred
from airflow.providers.google.cloud.hooks.datafusion import SUCCESS_STATES,
PipelineStates
+from airflow.providers.google.cloud.openlineage.facets import
DataFusionRunFacet
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
@@ -412,6 +413,65 @@ class TestCloudDataFusionStartPipelineOperatorAsync:
):
op.execute(context=mock.MagicMock())
+ @pytest.mark.parametrize(
+ "pipeline_id, runtime_args, expected_run_id, expected_runtime_args,
expected_output_suffix",
+ [
+ ("abc123", {"arg1": "val1"}, "abc123", {"arg1": "val1"}, "abc123"),
+ (None, None, None, None, "unknown"),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink.persist")
+ @mock.patch(HOOK_STR)
+ def test_openlineage_facets_with_mock(
+ self,
+ mock_hook,
+ mock_persist,
+ pipeline_id,
+ runtime_args,
+ expected_run_id,
+ expected_runtime_args,
+ expected_output_suffix,
+ ):
+ mock_persist.return_value = None
+
+ mock_instance = {"apiEndpoint": "https://mock-endpoint",
"serviceEndpoint": "https://mock-service"}
+ mock_hook.return_value.get_instance.return_value = mock_instance
+ mock_hook.return_value.start_pipeline.return_value = pipeline_id
+
+ op = CloudDataFusionStartPipelineOperator(
+ task_id=TASK_ID,
+ pipeline_name=PIPELINE_NAME,
+ instance_name=INSTANCE_NAME,
+ namespace=NAMESPACE,
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ runtime_args=runtime_args,
+ )
+
+ result_pipeline_id = op.execute(context={})
+ results = op.get_openlineage_facets_on_complete(task_instance=None)
+
+ assert result_pipeline_id == pipeline_id
+ assert op.pipeline_id == pipeline_id
+
+ expected_input_name =
f"{PROJECT_ID}:{LOCATION}:{INSTANCE_NAME}:{PIPELINE_NAME}"
+
+ assert results is not None
+ assert len(results.inputs) == 1
+ assert results.inputs[0].namespace == "datafusion"
+ assert results.inputs[0].name == expected_input_name
+
+ assert len(results.outputs) == 1
+ assert results.outputs[0].namespace == "datafusion"
+ assert results.outputs[0].name ==
f"{expected_input_name}:{expected_output_suffix}"
+
+ facet = results.run_facets["dataFusionRun"]
+ assert isinstance(facet, DataFusionRunFacet)
+ assert facet.runId == expected_run_id
+ assert facet.runtimeArgs == expected_runtime_args
+
+ assert results.job_facets == {}
+
class TestCloudDataFusionStopPipelineOperator:
@mock.patch(HOOK_STR)