This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce16963e9d Add OpenLineage support to BigQueryToGCSOperator (#35660)
ce16963e9d is described below
commit ce16963e9d69849309aa0a7cf978ed85ab741439
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Nov 17 15:52:40 2023 +0100
Add OpenLineage support to BigQueryToGCSOperator (#35660)
---
.../google/cloud/transfers/bigquery_to_gcs.py | 75 +++++-
.../providers/google/cloud/utils/openlineage.py | 80 ++++++
docs/spelling_wordlist.txt | 1 +
.../google/cloud/transfers/test_bigquery_to_gcs.py | 274 +++++++++++++++++++++
.../google/cloud/utils/test_openlineage.py | 142 +++++++++++
5 files changed, 571 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index a01c564cc3..58456b10f9 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -29,6 +29,7 @@ from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook,
BigQueryJob
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.triggers.bigquery import
BigQueryInsertJobTrigger
+from airflow.utils.helpers import merge_dicts
if TYPE_CHECKING:
from google.api_core.retry import Retry
@@ -139,6 +140,8 @@ class BigQueryToGCSOperator(BaseOperator):
self.hook: BigQueryHook | None = None
self.deferrable = deferrable
+ self._job_id: str = ""
+
@staticmethod
def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
if job.error_result:
@@ -240,6 +243,7 @@ class BigQueryToGCSOperator(BaseOperator):
f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
)
+ self._job_id = job.job_id
conf = job.to_api_repr()["configuration"]["extract"]["sourceTable"]
dataset_id, project_id, table_id = conf["datasetId"],
conf["projectId"], conf["tableId"]
BigQueryTableLink.persist(
@@ -255,7 +259,7 @@ class BigQueryToGCSOperator(BaseOperator):
timeout=self.execution_timeout,
trigger=BigQueryInsertJobTrigger(
conn_id=self.gcp_conn_id,
- job_id=job_id,
+ job_id=self._job_id,
project_id=self.project_id or self.hook.project_id,
),
method_name="execute_complete",
@@ -276,3 +280,72 @@ class BigQueryToGCSOperator(BaseOperator):
self.task_id,
event["message"],
)
+
+ def get_openlineage_facets_on_complete(self, task_instance):
+ """Implementing on_complete as we will include final BQ job id."""
+ from pathlib import Path
+
+ from openlineage.client.facet import (
+ ExternalQueryRunFacet,
+ SymlinksDatasetFacet,
+ SymlinksDatasetFacetIdentifiers,
+ )
+ from openlineage.client.run import Dataset
+
+ from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+ from airflow.providers.google.cloud.utils.openlineage import (
+ get_facets_from_bq_table,
+ get_identity_column_lineage_facet,
+ )
+ from airflow.providers.openlineage.extractors import OperatorLineage
+
+ table_object =
self.hook.get_client(self.hook.project_id).get_table(self.source_project_dataset_table)
+
+ input_dataset = Dataset(
+ namespace="bigquery",
+ name=str(table_object.reference),
+ facets=get_facets_from_bq_table(table_object),
+ )
+
+ output_dataset_facets = {
+ "schema": input_dataset.facets["schema"],
+ "columnLineage": get_identity_column_lineage_facet(
+ field_names=[field.name for field in table_object.schema],
input_datasets=[input_dataset]
+ ),
+ }
+ output_datasets = []
+ for uri in sorted(self.destination_cloud_storage_uris):
+ bucket, blob = _parse_gcs_url(uri)
+ additional_facets = {}
+
+ if "*" in blob:
+ # If wildcard ("*") is used in gcs path, we want the name of
dataset to be directory name,
+ # but we create a symlink to the full object path with
wildcard.
+ additional_facets = {
+ "symlink": SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{bucket}", name=blob,
type="file"
+ )
+ ]
+ ),
+ }
+ blob = Path(blob).parent.as_posix()
+ if blob == ".":
+ # blob path does not have leading slash, but we need root
dataset name to be "/"
+ blob = "/"
+
+ dataset = Dataset(
+ namespace=f"gs://{bucket}",
+ name=blob,
+ facets=merge_dicts(output_dataset_facets, additional_facets),
+ )
+ output_datasets.append(dataset)
+
+ run_facets = {}
+ if self._job_id:
+ run_facets = {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=self._job_id, source="bigquery"),
+ }
+
+ return OperatorLineage(inputs=[input_dataset],
outputs=output_datasets, run_facets=run_facets)
diff --git a/airflow/providers/google/cloud/utils/openlineage.py
b/airflow/providers/google/cloud/utils/openlineage.py
new file mode 100644
index 0000000000..3e96fffe5a
--- /dev/null
+++ b/airflow/providers/google/cloud/utils/openlineage.py
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains code related to OpenLineage and lineage extraction."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+from openlineage.client.facet import (
+ ColumnLineageDatasetFacet,
+ ColumnLineageDatasetFacetFieldsAdditional,
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+ DocumentationDatasetFacet,
+ SchemaDatasetFacet,
+ SchemaField,
+)
+
+if TYPE_CHECKING:
+ from google.cloud.bigquery.table import Table
+ from openlineage.client.run import Dataset
+
+
+def get_facets_from_bq_table(table: Table) -> dict[Any, Any]:
+ """Get facets from BigQuery table object."""
+ facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name=field.name, type=field.field_type,
description=field.description)
+ for field in table.schema
+ ]
+ ),
+ "documentation":
DocumentationDatasetFacet(description=table.description or ""),
+ }
+
+ return facets
+
+
+def get_identity_column_lineage_facet(
+ field_names: list[str],
+ input_datasets: list[Dataset],
+) -> ColumnLineageDatasetFacet:
+ """
+ Get column lineage facet.
+
+ Simple lineage will be created, where each source column corresponds to
single destination column
+ in each input dataset and there are no transformations made.
+ """
+ if field_names and not input_datasets:
+ raise ValueError("When providing `field_names` You must provide at
least one `input_dataset`.")
+
+ column_lineage_facet = ColumnLineageDatasetFacet(
+ fields={
+ field: ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=dataset.namespace, name=dataset.name,
field=field
+ )
+ for dataset in input_datasets
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ )
+ for field in field_names
+ }
+ )
+ return column_lineage_facet
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c56f81ebaf..65bbc8cb6f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -272,6 +272,7 @@ codepoints
Colour
colour
colours
+ColumnLineageDatasetFacet
CommandType
comparator
compat
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index b7bf8bef62..5dd32892ab 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -22,6 +22,19 @@ from unittest.mock import MagicMock
import pytest
from google.cloud.bigquery.retry import DEFAULT_RETRY
+from google.cloud.bigquery.table import Table
+from openlineage.client.facet import (
+ ColumnLineageDatasetFacet,
+ ColumnLineageDatasetFacetFieldsAdditional,
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+ DocumentationDatasetFacet,
+ ExternalQueryRunFacet,
+ SchemaDatasetFacet,
+ SchemaField,
+ SymlinksDatasetFacet,
+ SymlinksDatasetFacetIdentifiers,
+)
+from openlineage.client.run import Dataset
from airflow.exceptions import TaskDeferred
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import
BigQueryToGCSOperator
@@ -32,6 +45,25 @@ TEST_DATASET = "test-dataset"
TEST_TABLE_ID = "test-table-id"
PROJECT_ID = "test-project-id"
JOB_PROJECT_ID = "job-project-id"
+TEST_BUCKET = "test-bucket"
+TEST_FOLDER = "test-folder"
+TEST_OBJECT_NO_WILDCARD = "file.extension"
+TEST_OBJECT_WILDCARD = "file_*.extension"
+TEST_TABLE_API_REPR = {
+ "tableReference": {"projectId": PROJECT_ID, "datasetId": TEST_DATASET,
"tableId": TEST_TABLE_ID},
+ "description": "Table description.",
+ "schema": {
+ "fields": [
+ {"name": "field1", "type": "STRING", "description": "field1
description"},
+ {"name": "field2", "type": "INTEGER"},
+ ]
+ },
+}
+TEST_TABLE: Table = Table.from_api_repr(TEST_TABLE_API_REPR)
+TEST_EMPTY_TABLE_API_REPR = {
+ "tableReference": {"projectId": PROJECT_ID, "datasetId": TEST_DATASET,
"tableId": TEST_TABLE_ID}
+}
+TEST_EMPTY_TABLE: Table = Table.from_api_repr(TEST_EMPTY_TABLE_API_REPR)
class TestBigQueryToGCSOperator:
@@ -154,3 +186,245 @@ class TestBigQueryToGCSOperator:
retry=DEFAULT_RETRY,
nowait=True,
)
+
+ @pytest.mark.parametrize(
+ ("gcs_uri", "expected_dataset_name"),
+ (
+ (
+ f"gs://{TEST_BUCKET}/{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}",
+ f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}",
+ ),
+ (f"gs://{TEST_BUCKET}/{TEST_OBJECT_NO_WILDCARD}",
TEST_OBJECT_NO_WILDCARD),
+ (f"gs://{TEST_BUCKET}/{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}",
TEST_FOLDER),
+ (f"gs://{TEST_BUCKET}/{TEST_OBJECT_WILDCARD}", "/"),
+ (f"gs://{TEST_BUCKET}/{TEST_FOLDER}/*", TEST_FOLDER),
+ ),
+ )
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_gcs_dataset_name(
+ self, mock_hook, gcs_uri, expected_dataset_name
+ ):
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+
source_project_dataset_table=f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
+ destination_cloud_storage_uris=[gcs_uri],
+ )
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.outputs) == 1
+ assert lineage.outputs[0].name == expected_dataset_name
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_gcs_multiple_uris(self,
mock_hook):
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+
source_project_dataset_table=f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
+ destination_cloud_storage_uris=[
+
f"gs://{TEST_BUCKET}1/{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}",
+ f"gs://{TEST_BUCKET}2/{TEST_FOLDER}2/{TEST_OBJECT_WILDCARD}",
+ f"gs://{TEST_BUCKET}3/{TEST_OBJECT_NO_WILDCARD}",
+ f"gs://{TEST_BUCKET}4/{TEST_OBJECT_WILDCARD}",
+ ],
+ )
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.outputs) == 4
+ assert lineage.outputs[0].name ==
f"{TEST_FOLDER}1/{TEST_OBJECT_NO_WILDCARD}"
+ assert lineage.outputs[1].name == f"{TEST_FOLDER}2"
+ assert lineage.outputs[2].name == TEST_OBJECT_NO_WILDCARD
+ assert lineage.outputs[3].name == "/"
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_bq_dataset(self, mock_hook):
+ source_project_dataset_table =
f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
+
+ expected_input_dataset_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING",
description="field1 description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ ),
+ "documentation": DocumentationDatasetFacet(description="Table
description."),
+ }
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ mock_hook.return_value.get_client.return_value.get_table.return_value
= TEST_TABLE
+
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ source_project_dataset_table=source_project_dataset_table,
+ destination_cloud_storage_uris=["gs://bucket/file"],
+ )
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0] == Dataset(
+ namespace="bigquery",
+ name=source_project_dataset_table,
+ facets=expected_input_dataset_facets,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_bq_dataset_empty_table(self,
mock_hook):
+ source_project_dataset_table =
f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
+
+ expected_input_dataset_facets = {
+ "schema": SchemaDatasetFacet(fields=[]),
+ "documentation": DocumentationDatasetFacet(description=""),
+ }
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ mock_hook.return_value.get_client.return_value.get_table.return_value
= TEST_EMPTY_TABLE
+
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ source_project_dataset_table=source_project_dataset_table,
+ destination_cloud_storage_uris=["gs://bucket/file"],
+ )
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert lineage.inputs[0] == Dataset(
+ namespace="bigquery",
+ name=source_project_dataset_table,
+ facets=expected_input_dataset_facets,
+ )
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def
test_get_openlineage_facets_on_complete_gcs_no_wildcard_empty_table(self,
mock_hook):
+ source_project_dataset_table =
f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
+ destination_cloud_storage_uris =
[f"gs://{TEST_BUCKET}/{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}"]
+ real_job_id = "123456_hash"
+ bq_namespace = "bigquery"
+
+ expected_input_facets = {
+ "schema": SchemaDatasetFacet(fields=[]),
+ "documentation": DocumentationDatasetFacet(description=""),
+ }
+
+ expected_output_facets = {
+ "schema": SchemaDatasetFacet(fields=[]),
+ "columnLineage": ColumnLineageDatasetFacet(fields={}),
+ }
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.get_client.return_value.get_table.return_value
= TEST_EMPTY_TABLE
+
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ source_project_dataset_table=source_project_dataset_table,
+ destination_cloud_storage_uris=destination_cloud_storage_uris,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == Dataset(
+ namespace=bq_namespace, name=source_project_dataset_table,
facets=expected_input_facets
+ )
+ assert lineage.outputs[0] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=f"{TEST_FOLDER}/{TEST_OBJECT_NO_WILDCARD}",
+ facets=expected_output_facets,
+ )
+ assert lineage.run_facets == {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=real_job_id, source=bq_namespace)
+ }
+ assert lineage.job_facets == {}
+
+
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryHook")
+ def test_get_openlineage_facets_on_complete_gcs_wildcard_full_table(self,
mock_hook):
+ source_project_dataset_table =
f"{PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
+ destination_cloud_storage_uris =
[f"gs://{TEST_BUCKET}/{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}"]
+ real_job_id = "123456_hash"
+ bq_namespace = "bigquery"
+
+ schema_facet = SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING", description="field1
description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ )
+ expected_input_facets = {
+ "schema": schema_facet,
+ "documentation": DocumentationDatasetFacet(description="Table
description."),
+ }
+
+ expected_output_facets = {
+ "schema": schema_facet,
+ "columnLineage": ColumnLineageDatasetFacet(
+ fields={
+ "field1": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=bq_namespace,
name=source_project_dataset_table, field="field1"
+ )
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ "field2": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace=bq_namespace,
name=source_project_dataset_table, field="field2"
+ )
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ }
+ ),
+ "symlink": SymlinksDatasetFacet(
+ identifiers=[
+ SymlinksDatasetFacetIdentifiers(
+ namespace=f"gs://{TEST_BUCKET}",
+ name=f"{TEST_FOLDER}/{TEST_OBJECT_WILDCARD}",
+ type="file",
+ )
+ ]
+ ),
+ }
+
+ mock_hook.return_value.split_tablename.return_value = (PROJECT_ID,
TEST_DATASET, TEST_TABLE_ID)
+ mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.get_client.return_value.get_table.return_value
= TEST_TABLE
+
+ operator = BigQueryToGCSOperator(
+ project_id=JOB_PROJECT_ID,
+ task_id=TASK_ID,
+ source_project_dataset_table=source_project_dataset_table,
+ destination_cloud_storage_uris=destination_cloud_storage_uris,
+ )
+
+ operator.execute(context=mock.MagicMock())
+
+ lineage = operator.get_openlineage_facets_on_complete(None)
+ assert len(lineage.inputs) == 1
+ assert len(lineage.outputs) == 1
+ assert lineage.inputs[0] == Dataset(
+ namespace=bq_namespace, name=source_project_dataset_table,
facets=expected_input_facets
+ )
+ assert lineage.outputs[0] == Dataset(
+ namespace=f"gs://{TEST_BUCKET}", name=TEST_FOLDER,
facets=expected_output_facets
+ )
+ assert lineage.run_facets == {
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId=real_job_id, source=bq_namespace)
+ }
+ assert lineage.job_facets == {}
diff --git a/tests/providers/google/cloud/utils/test_openlineage.py
b/tests/providers/google/cloud/utils/test_openlineage.py
new file mode 100644
index 0000000000..608007fa4b
--- /dev/null
+++ b/tests/providers/google/cloud/utils/test_openlineage.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from google.cloud.bigquery.table import Table
+from openlineage.client.facet import (
+ ColumnLineageDatasetFacet,
+ ColumnLineageDatasetFacetFieldsAdditional,
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields,
+ DocumentationDatasetFacet,
+ SchemaDatasetFacet,
+ SchemaField,
+)
+from openlineage.client.run import Dataset
+
+from airflow.providers.google.cloud.utils import openlineage
+
+TEST_DATASET = "test-dataset"
+TEST_TABLE_ID = "test-table-id"
+TEST_PROJECT_ID = "test-project-id"
+TEST_TABLE_API_REPR = {
+ "tableReference": {"projectId": TEST_PROJECT_ID, "datasetId":
TEST_DATASET, "tableId": TEST_TABLE_ID},
+ "description": "Table description.",
+ "schema": {
+ "fields": [
+ {"name": "field1", "type": "STRING", "description": "field1
description"},
+ {"name": "field2", "type": "INTEGER"},
+ ]
+ },
+}
+TEST_TABLE: Table = Table.from_api_repr(TEST_TABLE_API_REPR)
+TEST_EMPTY_TABLE_API_REPR = {
+ "tableReference": {"projectId": TEST_PROJECT_ID, "datasetId":
TEST_DATASET, "tableId": TEST_TABLE_ID}
+}
+TEST_EMPTY_TABLE: Table = Table.from_api_repr(TEST_EMPTY_TABLE_API_REPR)
+
+
+def test_get_facets_from_bq_table():
+ expected_facets = {
+ "schema": SchemaDatasetFacet(
+ fields=[
+ SchemaField(name="field1", type="STRING", description="field1
description"),
+ SchemaField(name="field2", type="INTEGER"),
+ ]
+ ),
+ "documentation": DocumentationDatasetFacet(description="Table
description."),
+ }
+ result = openlineage.get_facets_from_bq_table(TEST_TABLE)
+ assert result == expected_facets
+
+
+def test_get_facets_from_empty_bq_table():
+ expected_facets = {
+ "schema": SchemaDatasetFacet(fields=[]),
+ "documentation": DocumentationDatasetFacet(description=""),
+ }
+ result = openlineage.get_facets_from_bq_table(TEST_EMPTY_TABLE)
+ assert result == expected_facets
+
+
+def test_get_identity_column_lineage_facet_multiple_input_datasets():
+ field_names = ["field1", "field2"]
+ input_datasets = [
+ Dataset(namespace="gs://first_bucket", name="dir1"),
+ Dataset(namespace="gs://second_bucket", name="dir2"),
+ ]
+ expected_facet = ColumnLineageDatasetFacet(
+ fields={
+ "field1": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace="gs://first_bucket",
+ name="dir1",
+ field="field1",
+ ),
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace="gs://second_bucket",
+ name="dir2",
+ field="field1",
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ "field2": ColumnLineageDatasetFacetFieldsAdditional(
+ inputFields=[
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace="gs://first_bucket",
+ name="dir1",
+ field="field2",
+ ),
+ ColumnLineageDatasetFacetFieldsAdditionalInputFields(
+ namespace="gs://second_bucket",
+ name="dir2",
+ field="field2",
+ ),
+ ],
+ transformationType="IDENTITY",
+ transformationDescription="identical",
+ ),
+ }
+ )
+ result = openlineage.get_identity_column_lineage_facet(
+ field_names=field_names, input_datasets=input_datasets
+ )
+ assert result == expected_facet
+
+
+def test_get_identity_column_lineage_facet_no_field_names():
+ field_names = []
+ input_datasets = [
+ Dataset(namespace="gs://first_bucket", name="dir1"),
+ Dataset(namespace="gs://second_bucket", name="dir2"),
+ ]
+ expected_facet = ColumnLineageDatasetFacet(fields={})
+ result = openlineage.get_identity_column_lineage_facet(
+ field_names=field_names, input_datasets=input_datasets
+ )
+ assert result == expected_facet
+
+
+def test_get_identity_column_lineage_facet_no_input_datasets():
+ field_names = ["field1", "field2"]
+ input_datasets = []
+
+ with pytest.raises(ValueError):
+ openlineage.get_identity_column_lineage_facet(field_names=field_names,
input_datasets=input_datasets)