This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5350be2194 Add support for write_on_empty in BaseSQLToGCSOperator 
(#28959)
5350be2194 is described below

commit 5350be2194250366536db7f78b88dc8e49c9620e
Author: Victor Chiapaikeo <[email protected]>
AuthorDate: Thu Jan 19 12:10:36 2023 -0500

    Add support for write_on_empty in BaseSQLToGCSOperator (#28959)
---
 .../providers/google/cloud/transfers/sql_to_gcs.py |  8 +++-
 .../google/cloud/transfers/test_sql_to_gcs.py      | 49 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py 
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index 12043b05d6..16b563daa1 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -82,6 +82,9 @@ class BaseSQLToGCSOperator(BaseOperator):
         this parameter, you must sort your dataset by partition_columns. Do 
this by
         passing an ORDER BY clause to the sql query. Files are uploaded to GCS 
as objects
         with a hive style partitioning directory structure (templated).
+    :param write_on_empty: Optional parameter to specify whether to write a 
file if the
+        export does not return any rows. Default is False so we will not write 
a file
+        if the export returns no rows.
     """
 
     template_fields: Sequence[str] = (
@@ -119,6 +122,7 @@ class BaseSQLToGCSOperator(BaseOperator):
         upload_metadata: bool = False,
         exclude_columns: set | None = None,
         partition_columns: list | None = None,
+        write_on_empty: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -143,6 +147,7 @@ class BaseSQLToGCSOperator(BaseOperator):
         self.upload_metadata = upload_metadata
         self.exclude_columns = exclude_columns
         self.partition_columns = partition_columns
+        self.write_on_empty = write_on_empty
 
     def execute(self, context: Context):
         if self.partition_columns:
@@ -316,7 +321,8 @@ class BaseSQLToGCSOperator(BaseOperator):
         if self.export_format == "parquet":
             parquet_writer.close()
         # Last file may have 0 rows, don't yield if empty
-        if file_to_upload["file_row_count"] > 0:
+        # However, if it is the first file and self.write_on_empty is True, 
then yield to write an empty file
+        if file_to_upload["file_row_count"] > 0 or (file_no == 0 and 
self.write_on_empty):
             file_to_upload["partition_values"] = curr_partition_values
             yield file_to_upload
 
diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
index bcb2a39100..d2c9594df7 100644
--- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
@@ -22,6 +22,7 @@ from unittest import mock
 from unittest.mock import MagicMock, Mock
 
 import pandas as pd
+import pytest
 import unicodecsv as csv
 
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -44,6 +45,7 @@ CURSOR_DESCRIPTION = [
     ("column_c", "10", 0, 0, 0, 0, False),
 ]
 TMP_FILE_NAME = "temp-file"
+EMPTY_INPUT_DATA = []
 INPUT_DATA = [
     ["101", "school", "2015-01-01"],
     ["102", "business", "2017-05-24"],
@@ -520,3 +522,50 @@ class TestBaseSQLToGCSOperator(unittest.TestCase):
 
         concat_df = pd.concat(concat_dfs, ignore_index=True)
         assert concat_df.equals(OUTPUT_DF)
+
+    def test__write_local_data_files_csv_does_not_write_on_empty_rows(self):
+        op = DummySQLToGCSOperator(
+            sql=SQL,
+            bucket=BUCKET,
+            filename=FILENAME,
+            task_id=TASK_ID,
+            schema_filename=SCHEMA_FILE,
+            export_format="csv",
+            gzip=False,
+            schema=SCHEMA,
+            gcp_conn_id="google_cloud_default",
+        )
+        cursor = MagicMock()
+        cursor.__iter__.return_value = EMPTY_INPUT_DATA
+        cursor.description = CURSOR_DESCRIPTION
+
+        files = op._write_local_data_files(cursor)
+        # Raises StopIteration when next is called because generator returns 
no files
+        with pytest.raises(StopIteration):
+            next(files)["file_handle"]
+
+        assert len([f for f in files]) == 0
+
+    def 
test__write_local_data_files_csv_writes_empty_file_with_write_on_empty(self):
+        op = DummySQLToGCSOperator(
+            sql=SQL,
+            bucket=BUCKET,
+            filename=FILENAME,
+            task_id=TASK_ID,
+            schema_filename=SCHEMA_FILE,
+            export_format="csv",
+            gzip=False,
+            schema=SCHEMA,
+            gcp_conn_id="google_cloud_default",
+            write_on_empty=True,
+        )
+        cursor = MagicMock()
+        cursor.__iter__.return_value = EMPTY_INPUT_DATA
+        cursor.description = CURSOR_DESCRIPTION
+
+        files = op._write_local_data_files(cursor)
+        file = next(files)["file_handle"]
+        file.flush()
+
+        df = pd.read_csv(file.name)
+        assert len(df.index) == 0

Reply via email to