This is an automated email from the ASF dual-hosted git repository.
basph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3bb0598661b Prefer BigQueryInsertJobOperator's project_id over hook's
project_id for openlineage (#55948)
3bb0598661b is described below
commit 3bb0598661b2e2f4d124256c1d5c0f81ccd1d52d
Author: Bas Harenslak <[email protected]>
AuthorDate: Sat Oct 4 06:12:54 2025 +0200
Prefer BigQueryInsertJobOperator's project_id over hook's project_id for
openlineage (#55948)
* Prefer operator's project_id over hook's project_id
* Remove unused import
* Formatting
* Remove unnecessary arguments
* Make mypy happy
---
.../providers/google/cloud/openlineage/mixins.py | 4 ++-
.../unit/google/cloud/openlineage/test_mixins.py | 34 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py
b/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py
index 5eba61b1886..539c24f6533 100644
--- a/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py
+++ b/providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py
@@ -97,7 +97,9 @@ class _BigQueryInsertJobOperatorOpenLineageMixin:
run_facets: dict[str, RunFacet] = {
"externalQuery":
ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery")
}
- self._client = self.hook.get_client(project_id=self.hook.project_id,
location=self.location)
+ self._client = self.hook.get_client(
+ project_id=self.project_id or self.hook.project_id,
location=self.location
+ )
try:
job_properties =
self._client.get_job(job_id=self.job_id)._properties
diff --git
a/providers/google/tests/unit/google/cloud/openlineage/test_mixins.py
b/providers/google/tests/unit/google/cloud/openlineage/test_mixins.py
index 7006d2f9130..e7204368fd5 100644
--- a/providers/google/tests/unit/google/cloud/openlineage/test_mixins.py
+++ b/providers/google/tests/unit/google/cloud/openlineage/test_mixins.py
@@ -1013,3 +1013,37 @@ class TestBigQueryOpenLineageMixin:
),
}
)
+
+ def test_project_id_selection(self):
+ """
+ Check if project_id set via an argument to the operator takes
prevalence over project_id set in a
+ connection.
+ """
+ from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
+
+ class TestOperator(GoogleCloudBaseOperator,
_BigQueryInsertJobOperatorOpenLineageMixin):
+ def __init__(self, project_id: str | None = None, **_):
+ self.project_id = project_id
+ self.job_id = "foobar"
+ self.location = "foobar"
+ self.sql = "foobar"
+
+ # First test task where project_id is set explicitly
+ test = TestOperator(project_id="project_a")
+ test.hook = MagicMock()
+ test.hook.project_id = "project_b"
+ test._client = MagicMock()
+
+ test.get_openlineage_facets_on_complete(None)
+ _, kwargs = test.hook.get_client.call_args
+ assert kwargs["project_id"] == "project_a"
+
+ # Then test task where project_id is inherited from the hook
+ test = TestOperator()
+ test.hook = MagicMock()
+ test.hook.project_id = "project_b"
+ test._client = MagicMock()
+
+ test.get_openlineage_facets_on_complete(None)
+ _, kwargs = test.hook.get_client.call_args
+ assert kwargs["project_id"] == "project_b"