This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae851876e2 Fix failing python BQ test (#30099)
3ae851876e2 is described below
commit 3ae851876e22be8ed09c4434c73dd654874e7ae4
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Jan 25 14:03:37 2024 -0500
Fix failing python BQ test (#30099)
---
.github/trigger_files/beam_PostCommit_Python.json | 0
sdks/python/apache_beam/io/gcp/bigquery.py | 5 +++++
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 1 +
.../python/apache_beam/io/gcp/bigquery_file_loads_test.py | 15 +++++++--------
sdks/python/test-suites/dataflow/common.gradle | 2 +-
5 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4643c8ddf0a..bba8b8a4af7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1854,6 +1854,7 @@ class WriteToBigQuery(PTransform):
kms_key=None,
batch_size=None,
max_file_size=None,
+ max_partition_size=None,
max_files_per_bundle=None,
test_client=None,
custom_gcs_temp_location=None,
@@ -1934,6 +1935,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
max_file_size (int): The maximum size for a file to be written and then
loaded into BigQuery. The default value is 4TB, which is 80% of the
limit of 5TB for BigQuery to load any file.
+ max_partition_size (int): Maximum byte size for each load job to
+ BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
max_files_per_bundle(int): The maximum number of files to be concurrently
written by a worker. The default here is 20. Larger values will allow
writing to multiple destinations without having to reshard - but they
@@ -2059,6 +2062,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
# TODO(pabloem): Consider handling ValueProvider for this location.
self.custom_gcs_temp_location = custom_gcs_temp_location
self.max_file_size = max_file_size
+ self.max_partition_size = max_partition_size
self.max_files_per_bundle = max_files_per_bundle
self.method = method or WriteToBigQuery.Method.DEFAULT
self.triggering_frequency = triggering_frequency
@@ -2202,6 +2206,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
with_auto_sharding=self.with_auto_sharding,
temp_file_format=self._temp_file_format,
max_file_size=self.max_file_size,
+ max_partition_size=self.max_partition_size,
max_files_per_bundle=self.max_files_per_bundle,
custom_gcs_temp_location=self.custom_gcs_temp_location,
test_client=self.test_client,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 48f2ab4b36b..e1a4af31f1c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -747,6 +747,7 @@ class TriggerLoadJobs(beam.DoFn):
)
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+
job_reference = self.bq_wrapper.perform_load_job(
destination=table_reference,
source_uris=files,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 345c8e70500..0605206714e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -925,9 +925,6 @@ class BigQueryFileLoadsIT(unittest.TestCase):
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND')
- # reduce load job size to induce copy jobs
- bqfl._DEFAULT_MAX_FILE_SIZE = 10
- bqfl._MAXIMUM_LOAD_SIZE = 20
verifiers = [
BigqueryFullResultMatcher(
project=self.project,
@@ -949,8 +946,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
dest += "_2"
return dest
- args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(verifiers))
+ args = self.test_pipeline.get_full_options_as_args()
with beam.Pipeline(argv=args) as p:
# 0...4 going to table 1
@@ -961,7 +957,10 @@ class BigQueryFileLoadsIT(unittest.TestCase):
p | beam.Create(items) | bigquery.WriteToBigQuery(
table=callable_table,
create_disposition="CREATE_NEVER",
- write_disposition="WRITE_APPEND"))
+ write_disposition="WRITE_APPEND",
+ # reduce load job size to induce copy jobs
+ max_file_size=10,
+ max_partition_size=20))
hamcrest_assert(p, all_of(*verifiers))
@@ -1001,8 +1000,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
if 'foundation' in d])
]
- args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(*pipeline_verifiers))
+ args = self.test_pipeline.get_full_options_as_args()
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS, reshuffle=False)
@@ -1044,6 +1042,7 @@ class BigQueryFileLoadsIT(unittest.TestCase):
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
max_file_size=20,
max_files_per_bundle=-1))
+ hamcrest_assert(p, all_of(*pipeline_verifiers))
@pytest.mark.it_postcommit
def test_bqfl_streaming(self):
diff --git a/sdks/python/test-suites/dataflow/common.gradle
b/sdks/python/test-suites/dataflow/common.gradle
index a713b82400e..5fc1751a968 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -133,7 +133,7 @@ task postCommitIT {
"test_opts": testOpts,
"sdk_location": project.ext.sdkLocation,
"suite": "postCommitIT-df${pythonVersionSuffix}",
- "collect": "it_postcommit"
+ "collect": "it_postcommit",
]
def cmdArgs = mapToArgString(argMap)
exec {