This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7f1d54ad4b1 chore: remove deprecated bigquery facets from OpenLineage
utils (#44838)
7f1d54ad4b1 is described below
commit 7f1d54ad4b14a4b255b20dfb162ee3667341d6f6
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Dec 11 14:33:50 2024 +0100
chore: remove deprecated bigquery facets from OpenLineage utils (#44838)
Signed-off-by: Kacper Muda <[email protected]>
---
.../cloud/openlineage/BigQueryErrorRunFacet.json | 30 ----------------------
.../providers/google/cloud/openlineage/mixins.py | 14 +++-------
.../providers/google/cloud/openlineage/utils.py | 22 ----------------
.../tests/google/cloud/openlineage/test_mixins.py | 6 -----
.../tests/google/cloud/operators/test_bigquery.py | 1 -
5 files changed, 3 insertions(+), 70 deletions(-)
diff --git
a/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json
b/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json
deleted file mode 100644
index 3213f9b8b2d..00000000000
---
a/providers/src/airflow/providers/google/cloud/openlineage/BigQueryErrorRunFacet.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "$schema": "https://json-schema.org/draft/2020-12/schema",
- "$defs": {
- "BigQueryErrorRunFacet": {
- "allOf": [
- {
- "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
- },
- {
- "type": "object",
- "properties": {
- "clientError": {
- "type": "string"
- },
- "parserError": {
- "type": "string"
- }
- }
- }
- ],
- "type": "object"
- }
- },
- "type": "object",
- "properties": {
- "bigQuery_error": {
- "$ref": "#/$defs/BigQueryErrorRunFacet"
- }
- }
- }
diff --git a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py
b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py
index df8a0875f8c..ce7a14e03ae 100644
--- a/providers/src/airflow/providers/google/cloud/openlineage/mixins.py
+++ b/providers/src/airflow/providers/google/cloud/openlineage/mixins.py
@@ -108,10 +108,7 @@ class _BigQueryOpenLineageMixin:
def get_facets(self, job_id: str):
from airflow.providers.common.compat.openlineage.facet import
ErrorMessageRunFacet
- from airflow.providers.google.cloud.openlineage.utils import (
- BigQueryErrorRunFacet,
- get_from_nullable_chain,
- )
+ from airflow.providers.google.cloud.openlineage.utils import
get_from_nullable_chain
inputs = []
outputs = []
@@ -125,8 +122,7 @@ class _BigQueryOpenLineageMixin:
if get_from_nullable_chain(props, ["status", "state"]) != "DONE":
raise ValueError(f"Trying to extract data from running
bigquery job: `{job_id}`")
- # TODO: remove bigQuery_job in next release
- run_facets["bigQuery_job"] = run_facets["bigQueryJob"] =
self._get_bigquery_job_run_facet(props)
+ run_facets["bigQueryJob"] = self._get_bigquery_job_run_facet(props)
if get_from_nullable_chain(props, ["statistics", "numChildJobs"]):
if hasattr(self, "log"):
@@ -145,16 +141,12 @@ class _BigQueryOpenLineageMixin:
if hasattr(self, "log"):
self.log.warning("Cannot retrieve job details from
BigQuery.Client. %s", e, exc_info=True)
exception_msg = traceback.format_exc()
- # TODO: remove BigQueryErrorRunFacet in next release
run_facets.update(
{
"errorMessage": ErrorMessageRunFacet(
message=f"{e}: {exception_msg}",
programmingLanguage="python",
- ),
- "bigQuery_error": BigQueryErrorRunFacet(
- clientError=f"{e}: {exception_msg}",
- ),
+ )
}
)
deduplicated_outputs = self._deduplicate_outputs(outputs)
diff --git a/providers/src/airflow/providers/google/cloud/openlineage/utils.py
b/providers/src/airflow/providers/google/cloud/openlineage/utils.py
index a8989b4eb8f..d7852b13ecb 100644
--- a/providers/src/airflow/providers/google/cloud/openlineage/utils.py
+++ b/providers/src/airflow/providers/google/cloud/openlineage/utils.py
@@ -218,28 +218,6 @@ class BigQueryJobRunFacet(RunFacet):
)
-# TODO: remove BigQueryErrorRunFacet in next release
-@define
-class BigQueryErrorRunFacet(RunFacet):
- """
- Represents errors that can happen during execution of BigqueryExtractor.
-
- :param clientError: represents errors originating in bigquery client
- :param parserError: represents errors that happened during parsing SQL
provided to bigquery
- """
-
- clientError: str | None = field(default=None)
- parserError: str | None = field(default=None)
-
- @staticmethod
- def _get_schema() -> str:
- return (
- "https://raw.githubusercontent.com/apache/airflow/"
- f"providers-google/{provider_version}/airflow/providers/google/"
- "openlineage/BigQueryErrorRunFacet.json"
- )
-
-
def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
"""
Get object from nested structure of objects, where it's not guaranteed
that all keys in the nested structure exist.
diff --git a/providers/tests/google/cloud/openlineage/test_mixins.py
b/providers/tests/google/cloud/openlineage/test_mixins.py
index 41e4a22ee3f..fb047ddc2d1 100644
--- a/providers/tests/google/cloud/openlineage/test_mixins.py
+++ b/providers/tests/google/cloud/openlineage/test_mixins.py
@@ -83,9 +83,6 @@ class TestBigQueryOpenLineageMixin:
self.job_details["configuration"]["query"].pop("query")
assert lineage.run_facets == {
- "bigQuery_job": BigQueryJobRunFacet(
- cached=False, billedBytes=111149056,
properties=json.dumps(self.job_details)
- ),
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=111149056,
properties=json.dumps(self.job_details)
),
@@ -136,9 +133,6 @@ class TestBigQueryOpenLineageMixin:
"bigQueryJob": BigQueryJobRunFacet(
cached=False, billedBytes=120586240,
properties=json.dumps(self.script_job_details)
),
- "bigQuery_job": BigQueryJobRunFacet(
- cached=False, billedBytes=120586240,
properties=json.dumps(self.script_job_details)
- ),
"externalQuery": ExternalQueryRunFacet(externalQueryId="job_id",
source="bigquery"),
}
assert lineage.inputs == [
diff --git a/providers/tests/google/cloud/operators/test_bigquery.py
b/providers/tests/google/cloud/operators/test_bigquery.py
index 29f3a8db13e..ab89443a69e 100644
--- a/providers/tests/google/cloud/operators/test_bigquery.py
+++ b/providers/tests/google/cloud/operators/test_bigquery.py
@@ -1614,7 +1614,6 @@ class TestBigQueryInsertJobOperator:
]
assert lineage.run_facets == {
- "bigQuery_job": mock.ANY,
"bigQueryJob": mock.ANY,
"externalQuery": ExternalQueryRunFacet(externalQueryId=mock.ANY,
source="bigquery"),
}