This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5350be2194 Add support for write_on_empty in BaseSQLToGCSOperator
(#28959)
5350be2194 is described below
commit 5350be2194250366536db7f78b88dc8e49c9620e
Author: Victor Chiapaikeo <[email protected]>
AuthorDate: Thu Jan 19 12:10:36 2023 -0500
Add support for write_on_empty in BaseSQLToGCSOperator (#28959)
---
.../providers/google/cloud/transfers/sql_to_gcs.py | 8 +++-
.../google/cloud/transfers/test_sql_to_gcs.py | 49 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index 12043b05d6..16b563daa1 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -82,6 +82,9 @@ class BaseSQLToGCSOperator(BaseOperator):
this parameter, you must sort your dataset by partition_columns. Do
this by
passing an ORDER BY clause to the sql query. Files are uploaded to GCS
as objects
with a hive style partitioning directory structure (templated).
+ :param write_on_empty: Optional parameter to specify whether to write a
file if the
+ export does not return any rows. Default is False so we will not write
a file
+ if the export returns no rows.
"""
template_fields: Sequence[str] = (
@@ -119,6 +122,7 @@ class BaseSQLToGCSOperator(BaseOperator):
upload_metadata: bool = False,
exclude_columns: set | None = None,
partition_columns: list | None = None,
+ write_on_empty: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -143,6 +147,7 @@ class BaseSQLToGCSOperator(BaseOperator):
self.upload_metadata = upload_metadata
self.exclude_columns = exclude_columns
self.partition_columns = partition_columns
+ self.write_on_empty = write_on_empty
def execute(self, context: Context):
if self.partition_columns:
@@ -316,7 +321,8 @@ class BaseSQLToGCSOperator(BaseOperator):
if self.export_format == "parquet":
parquet_writer.close()
# Last file may have 0 rows, don't yield if empty
- if file_to_upload["file_row_count"] > 0:
+ # However, if it is the first file and self.write_on_empty is True,
then yield to write an empty file
+ if file_to_upload["file_row_count"] > 0 or (file_no == 0 and
self.write_on_empty):
file_to_upload["partition_values"] = curr_partition_values
yield file_to_upload
diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
index bcb2a39100..d2c9594df7 100644
--- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
@@ -22,6 +22,7 @@ from unittest import mock
from unittest.mock import MagicMock, Mock
import pandas as pd
+import pytest
import unicodecsv as csv
from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -44,6 +45,7 @@ CURSOR_DESCRIPTION = [
("column_c", "10", 0, 0, 0, 0, False),
]
TMP_FILE_NAME = "temp-file"
+EMPTY_INPUT_DATA = []
INPUT_DATA = [
["101", "school", "2015-01-01"],
["102", "business", "2017-05-24"],
@@ -520,3 +522,50 @@ class TestBaseSQLToGCSOperator(unittest.TestCase):
concat_df = pd.concat(concat_dfs, ignore_index=True)
assert concat_df.equals(OUTPUT_DF)
+
+ def test__write_local_data_files_csv_does_not_write_on_empty_rows(self):
+ op = DummySQLToGCSOperator(
+ sql=SQL,
+ bucket=BUCKET,
+ filename=FILENAME,
+ task_id=TASK_ID,
+ schema_filename=SCHEMA_FILE,
+ export_format="csv",
+ gzip=False,
+ schema=SCHEMA,
+ gcp_conn_id="google_cloud_default",
+ )
+ cursor = MagicMock()
+ cursor.__iter__.return_value = EMPTY_INPUT_DATA
+ cursor.description = CURSOR_DESCRIPTION
+
+ files = op._write_local_data_files(cursor)
+ # Raises StopIteration when next is called because generator returns
no files
+ with pytest.raises(StopIteration):
+ next(files)["file_handle"]
+
+ assert len([f for f in files]) == 0
+
+ def
test__write_local_data_files_csv_writes_empty_file_with_write_on_empty(self):
+ op = DummySQLToGCSOperator(
+ sql=SQL,
+ bucket=BUCKET,
+ filename=FILENAME,
+ task_id=TASK_ID,
+ schema_filename=SCHEMA_FILE,
+ export_format="csv",
+ gzip=False,
+ schema=SCHEMA,
+ gcp_conn_id="google_cloud_default",
+ write_on_empty=True,
+ )
+ cursor = MagicMock()
+ cursor.__iter__.return_value = EMPTY_INPUT_DATA
+ cursor.description = CURSOR_DESCRIPTION
+
+ files = op._write_local_data_files(cursor)
+ file = next(files)["file_handle"]
+ file.flush()
+
+ df = pd.read_csv(file.name)
+ assert len(df.index) == 0