This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 73f7d89158 Exclude partition from BigQuery table name (#42130)
73f7d89158 is described below
commit 73f7d891583b023239c73a926cf1fdc69069176b
Author: max <[email protected]>
AuthorDate: Wed Sep 11 16:52:19 2024 +0000
Exclude partition from BigQuery table name (#42130)
Please enter the commit message for your changes. Lines starting
---
airflow/providers/google/cloud/hooks/bigquery.py | 13 +++-
.../providers/google/cloud/hooks/test_bigquery.py | 71 +++++++++++++++++++---
2 files changed, 76 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py
b/airflow/providers/google/cloud/hooks/bigquery.py
index 8e330b17d5..b1aed15c45 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -2415,10 +2415,13 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
table_id = cmpt[1]
else:
raise ValueError(
- f"{var_print(var_name)} Expect format of
(<project.|<project:)<dataset>.<table>, "
+ f"{var_print(var_name)}Expect format of
(<project.|<project:)<dataset>.<table>, "
f"got {table_input}"
)
+ # Exclude partition from the table name
+ table_id = table_id.split("$")[0]
+
if project_id is None:
if var_name is not None:
self.log.info(
@@ -3282,6 +3285,11 @@ def _escape(s: str) -> str:
return e
+@deprecated(
+ planned_removal_date="April 01, 2025",
+
use_instead="airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.split_tablename",
+ category=AirflowProviderDeprecationWarning,
+)
def split_tablename(
table_input: str, default_project_id: str, var_name: str | None = None
) -> tuple[str, str, str]:
@@ -3330,6 +3338,9 @@ def split_tablename(
f"{var_print(var_name)}Expect format of
(<project.|<project:)<dataset>.<table>, got {table_input}"
)
+ # Exclude partition from the table name
+ table_id = table_id.split("$")[0]
+
if project_id is None:
if var_name is not None:
log.info(
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py
b/tests/providers/google/cloud/hooks/test_bigquery.py
index fcee80d224..81db43c0f5 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -1030,12 +1030,11 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
== result
)
-
-class TestBigQueryTableSplitter:
- def test_internal_need_default_project(self):
+ def test_split_tablename_internal_need_default_project(self):
with pytest.raises(ValueError, match="INTERNAL: No default project is
specified"):
- split_tablename("dataset.table", None)
+ self.hook.split_tablename("dataset.table", None)
+ @pytest.mark.parametrize("partition", ["$partition", ""])
@pytest.mark.parametrize(
"project_expected, dataset_expected, table_expected, table_input",
[
@@ -1046,9 +1045,11 @@ class TestBigQueryTableSplitter:
("alt1:alt", "dataset", "table", "alt1:alt:dataset.table"),
],
)
- def test_split_tablename(self, project_expected, dataset_expected,
table_expected, table_input):
+ def test_split_tablename(
+ self, project_expected, dataset_expected, table_expected, table_input,
partition
+ ):
default_project_id = "project"
- project, dataset, table = split_tablename(table_input,
default_project_id)
+ project, dataset, table = self.hook.split_tablename(table_input +
partition, default_project_id)
assert project_expected == project
assert dataset_expected == dataset
assert table_expected == table
@@ -1080,9 +1081,65 @@ class TestBigQueryTableSplitter:
),
],
)
- def test_invalid_syntax(self, table_input, var_name, exception_message):
+ def test_split_tablename_invalid_syntax(self, table_input, var_name,
exception_message):
default_project_id = "project"
with pytest.raises(ValueError,
match=exception_message.format(table_input)):
+ self.hook.split_tablename(table_input, default_project_id,
var_name)
+
+
+class TestBigQueryTableSplitter:
+ def test_internal_need_default_project(self):
+ with pytest.raises(AirflowProviderDeprecationWarning):
+ split_tablename("dataset.table", None)
+
+ @pytest.mark.parametrize("partition", ["$partition", ""])
+ @pytest.mark.parametrize(
+ "project_expected, dataset_expected, table_expected, table_input",
+ [
+ ("project", "dataset", "table", "dataset.table"),
+ ("alternative", "dataset", "table", "alternative:dataset.table"),
+ ("alternative", "dataset", "table", "alternative.dataset.table"),
+ ("alt1:alt", "dataset", "table", "alt1:alt.dataset.table"),
+ ("alt1:alt", "dataset", "table", "alt1:alt:dataset.table"),
+ ],
+ )
+ def test_split_tablename(
+ self, project_expected, dataset_expected, table_expected, table_input,
partition
+ ):
+ default_project_id = "project"
+ with pytest.raises(AirflowProviderDeprecationWarning):
+ split_tablename(table_input + partition, default_project_id)
+
+ @pytest.mark.parametrize(
+ "table_input, var_name, exception_message",
+ [
+ ("alt1:alt2:alt3:dataset.table", None, "Use either : or . to
specify project got {}"),
+ (
+ "alt1.alt.dataset.table",
+ None,
+ r"Expect format of
\(<project\.\|<project\:\)<dataset>\.<table>, got {}",
+ ),
+ (
+ "alt1:alt2:alt.dataset.table",
+ "var_x",
+ "Format exception for var_x: Use either : or . to specify
project got {}",
+ ),
+ (
+ "alt1:alt2:alt:dataset.table",
+ "var_x",
+ "Format exception for var_x: Use either : or . to specify
project got {}",
+ ),
+ (
+ "alt1.alt.dataset.table",
+ "var_x",
+ r"Format exception for var_x: Expect format of "
+ r"\(<project\.\|<project:\)<dataset>.<table>, got {}",
+ ),
+ ],
+ )
+ def test_invalid_syntax(self, table_input, var_name, exception_message):
+ default_project_id = "project"
+ with pytest.raises(AirflowProviderDeprecationWarning):
split_tablename(table_input, default_project_id, var_name)