This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch release-2.45.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.45.0 by this push:
     new 7e59b529b9a Fix truncate copy job when WRITE_TRUNCATE in BigQuery 
batch load (#25101) (#25102)
7e59b529b9a is described below

commit 7e59b529b9ab212fe9c9df7e65ff74bdf140ea60
Author: Yi Hu <[email protected]>
AuthorDate: Mon Jan 23 10:06:48 2023 -0500

    Fix truncate copy job when WRITE_TRUNCATE in BigQuery batch load (#25101) 
(#25102)
    
    * Fix truncate copyjob when WRITE_TRUNCATE in BigQuery batch load
---
 CHANGES.md                                         |  3 +-
 .../apache_beam/io/gcp/bigquery_file_loads.py      |  2 +-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 41 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 18b7ed989fb..35eee824a6c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -85,12 +85,13 @@
 
 * Avoids Cassandra syntax error when user-defined query has no where clause in 
it (Java) ([#24829](https://github.com/apache/beam/issues/24829)).
 * Fixed JDBC connection failures (Java) during handshake due to deprecated 
TLSv1(.1) protocol for the JDK. 
([#24623](https://github.com/apache/beam/issues/24623))
+* Fixed Python BigQuery Batch Load write may truncate valid data when 
deposition sets to WRITE_TRUNCATE and incoming data is large (Python) 
([#24623](https://github.com/apache/beam/issues/24535)).
 
 ## Known Issues
 
 * ([#X](https://github.com/apache/beam/issues/X)).
 
-# [2.44.0] - Unreleased
+# [2.44.0] - 2023-01-12
 
 ## Highlights
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 9f889dbd023..0e06dc94c9a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -1093,7 +1093,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 load_job_project_id=self.load_job_project_id),
             schema_mod_job_name_pcv))
 
-    if self.create_disposition == 'WRITE_TRUNCATE':
+    if self.write_disposition == 'WRITE_TRUNCATE':
       # All loads going to the same table must be processed together so that
       # the truncation happens only once. See BEAM-24535.
       finished_temp_tables_load_job_ids_list_pc = (
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 0c0e136eae4..797ea0333ec 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -715,6 +715,47 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
           equal_to([6]),
           label='CheckCopyJobCount')
 
+  @mock.patch(
+      'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs.process',
+      wraps=lambda *x: None)
+  def test_multiple_partition_files_write_truncate(self, mock_call_process):
+    destination = 'project1:dataset1.table1'
+
+    job_reference = bigquery_api.JobReference()
+    job_reference.projectId = 'project1'
+    job_reference.jobId = 'job_name1'
+    result_job = mock.Mock()
+    result_job.jobReference = job_reference
+
+    mock_job = mock.Mock()
+    mock_job.status.state = 'DONE'
+    mock_job.status.errorResult = None
+    mock_job.jobReference = job_reference
+
+    bq_client = mock.Mock()
+    bq_client.jobs.Get.return_value = mock_job
+
+    bq_client.jobs.Insert.return_value = result_job
+    bq_client.tables.Delete.return_value = None
+
+    with TestPipeline('DirectRunner') as p:
+      _ = (
+          p
+          | beam.Create(_ELEMENTS, reshuffle=False)
+          | bqfl.BigQueryBatchFileLoads(
+              destination,
+              custom_gcs_temp_location=self._new_tempdir(),
+              test_client=bq_client,
+              validate=False,
+              temp_file_format=bigquery_tools.FileFormat.JSON,
+              max_file_size=45,
+              max_partition_size=80,
+              max_files_per_partition=2,
+              write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+    # TriggerCopyJob only processes once
+    self.assertEqual(mock_call_process.call_count, 1)
+
   @parameterized.expand([
       param(is_streaming=False, with_auto_sharding=False),
       param(is_streaming=True, with_auto_sharding=False),

Reply via email to