This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ae851876e2 Fix failing python BQ test (#30099)
3ae851876e2 is described below

commit 3ae851876e22be8ed09c4434c73dd654874e7ae4
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Jan 25 14:03:37 2024 -0500

    Fix failing python BQ test (#30099)
---
 .github/trigger_files/beam_PostCommit_Python.json         |  0
 sdks/python/apache_beam/io/gcp/bigquery.py                |  5 +++++
 sdks/python/apache_beam/io/gcp/bigquery_file_loads.py     |  1 +
 .../python/apache_beam/io/gcp/bigquery_file_loads_test.py | 15 +++++++--------
 sdks/python/test-suites/dataflow/common.gradle            |  2 +-
 5 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4643c8ddf0a..bba8b8a4af7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1854,6 +1854,7 @@ class WriteToBigQuery(PTransform):
       kms_key=None,
       batch_size=None,
       max_file_size=None,
+      max_partition_size=None,
       max_files_per_bundle=None,
       test_client=None,
       custom_gcs_temp_location=None,
@@ -1934,6 +1935,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
       max_file_size (int): The maximum size for a file to be written and then
         loaded into BigQuery. The default value is 4TB, which is 80% of the
         limit of 5TB for BigQuery to load any file.
+      max_partition_size (int): Maximum byte size for each load job to
+        BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
       max_files_per_bundle(int): The maximum number of files to be concurrently
         written by a worker. The default here is 20. Larger values will allow
         writing to multiple destinations without having to reshard - but they
@@ -2059,6 +2062,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     # TODO(pabloem): Consider handling ValueProvider for this location.
     self.custom_gcs_temp_location = custom_gcs_temp_location
     self.max_file_size = max_file_size
+    self.max_partition_size = max_partition_size
     self.max_files_per_bundle = max_files_per_bundle
     self.method = method or WriteToBigQuery.Method.DEFAULT
     self.triggering_frequency = triggering_frequency
@@ -2202,6 +2206,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
           with_auto_sharding=self.with_auto_sharding,
           temp_file_format=self._temp_file_format,
           max_file_size=self.max_file_size,
+          max_partition_size=self.max_partition_size,
           max_files_per_bundle=self.max_files_per_bundle,
           custom_gcs_temp_location=self.custom_gcs_temp_location,
           test_client=self.test_client,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 48f2ab4b36b..e1a4af31f1c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -747,6 +747,7 @@ class TriggerLoadJobs(beam.DoFn):
     )
     if not self.bq_io_metadata:
       self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+
     job_reference = self.bq_wrapper.perform_load_job(
         destination=table_reference,
         source_uris=files,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 345c8e70500..0605206714e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -925,9 +925,6 @@ class BigQueryFileLoadsIT(unittest.TestCase):
         create_disposition='CREATE_IF_NEEDED',
         write_disposition='WRITE_APPEND')
 
-    # reduce load job size to induce copy jobs
-    bqfl._DEFAULT_MAX_FILE_SIZE = 10
-    bqfl._MAXIMUM_LOAD_SIZE = 20
     verifiers = [
         BigqueryFullResultMatcher(
             project=self.project,
@@ -949,8 +946,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
         dest += "_2"
       return dest
 
-    args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(verifiers))
+    args = self.test_pipeline.get_full_options_as_args()
 
     with beam.Pipeline(argv=args) as p:
       # 0...4 going to table 1
@@ -961,7 +957,10 @@ class BigQueryFileLoadsIT(unittest.TestCase):
           p | beam.Create(items) | bigquery.WriteToBigQuery(
               table=callable_table,
               create_disposition="CREATE_NEVER",
-              write_disposition="WRITE_APPEND"))
+              write_disposition="WRITE_APPEND",
+              # reduce load job size to induce copy jobs
+              max_file_size=10,
+              max_partition_size=20))
 
     hamcrest_assert(p, all_of(*verifiers))
 
@@ -1001,8 +1000,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
                   if 'foundation' in d])
     ]
 
-    args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(*pipeline_verifiers))
+    args = self.test_pipeline.get_full_options_as_args()
 
     with beam.Pipeline(argv=args) as p:
       input = p | beam.Create(_ELEMENTS, reshuffle=False)
@@ -1044,6 +1042,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
               max_file_size=20,
               max_files_per_bundle=-1))
+    hamcrest_assert(p, all_of(*pipeline_verifiers))
 
   @pytest.mark.it_postcommit
   def test_bqfl_streaming(self):
diff --git a/sdks/python/test-suites/dataflow/common.gradle 
b/sdks/python/test-suites/dataflow/common.gradle
index a713b82400e..5fc1751a968 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -133,7 +133,7 @@ task postCommitIT {
         "test_opts": testOpts,
         "sdk_location": project.ext.sdkLocation,
         "suite": "postCommitIT-df${pythonVersionSuffix}",
-        "collect": "it_postcommit"
+        "collect": "it_postcommit",
     ]
     def cmdArgs = mapToArgString(argMap)
     exec {

Reply via email to