This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9c989cc Fix auto-sharding parameter for BigQuery sink with FILE LOADS
new 63c13d6 Merge pull request #14183 from [BEAM-11772] Fix auto-sharding
parameter for BigQuery sink with FILE LOADS
9c989cc is described below
commit 9c989ccde6a4053d49631e21fea4b89193a1fca0
Author: sychen <[email protected]>
AuthorDate: Tue Mar 9 22:36:36 2021 -0800
Fix auto-sharding parameter for BigQuery sink with FILE LOADS
---
.../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 20 ++++++++++++--------
.../python/apache_beam/io/gcp/bigquery_file_loads.py | 13 +++++++++----
2 files changed, 21 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1828192..1372e3f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -116,6 +116,10 @@ class BatchLoads<DestinationT, ElementT>
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If using auto-sharding for unbounded data, we batch the records before
triggering file write
+ // to avoid generating too many small files.
+ static final Duration FILE_TRIGGERING_BATCHING_DURATION =
Duration.standardSeconds(1);
+
// The maximum number of retries to poll the status of a job.
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job
finishes.
static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@@ -294,9 +298,9 @@ class BatchLoads<DestinationT, ElementT>
.discardingFiredPanes());
results = writeStaticallyShardedFiles(inputInGlobalWindow,
tempFilePrefixView);
} else {
- // In the case of dynamic sharding, however, we use a default triggering
and instead apply the
- // user supplied triggeringFrequency to the sharding transform. See
- // writeDynamicallyShardedFilesTriggered.
+ // In the case of dynamic sharding, however, we use a default trigger
since the transform
+ // performs sharding also batches elements to avoid generating too many
tiny files. User
+ // trigger is applied right after writes to limit the number of load
jobs.
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
@@ -569,15 +573,15 @@ class BatchLoads<DestinationT, ElementT>
// of filename, file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFilesTriggered(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String>
tempFilePrefix) {
- // In contrast to fixed sharding with triggering, here we use a global
window with default
- // trigger and apply the user supplied triggeringFrequency in the
subsequent GroupIntoBatches
- // transform. We also ensure that the files are written if a threshold
number of records are
- // ready. Dynamic sharding is achieved via the withShardedKey() option
provided by
+ // In contrast to fixed sharding with user trigger, here we use a global
window with default
+ // trigger and rely on GroupIntoBatches transform to group, batch and at
the same time
+ // parallelize properly. We also ensure that the files are written if a
threshold number of
+ // records are ready. Dynamic sharding is achieved via the
withShardedKey() option provided by
// GroupIntoBatches.
return input
.apply(
GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
- .withMaxBufferingDuration(triggeringFrequency)
+ .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION)
.withShardedKey())
.setCoder(
KvCoder.of(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 298ead6..608301e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -69,6 +69,10 @@ _MAXIMUM_SOURCE_URIS = 10 * 1000
# this many records are written.
_FILE_TRIGGERING_RECORD_COUNT = 500000
+# If using auto-sharding for unbounded data, we batch the records before
+# triggering file write to avoid generating too many small files.
+_FILE_TRIGGERING_BATCHING_DURATION_SECS = 1
+
def _generate_job_name(job_name, job_type, step_name):
return bigquery_tools.generate_bq_job_name(
@@ -729,9 +733,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
# We use only the user-supplied trigger on the actual BigQuery load.
# This allows us to offload the data to the filesystem.
#
- # In the case of auto sharding, however, we use a default triggering and
- # instead apply the user supplied triggering_frequency to the transfrom
that
- # performs sharding.
+ # In the case of dynamic sharding, however, we use a default trigger since
+ # the transform performs sharding also batches elements to avoid generating
+ # too many tiny files. User trigger is applied right after writes to limit
+ # the number of load jobs.
if self.is_streaming_pipeline and not self.with_auto_sharding:
return beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(
@@ -822,7 +827,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1]))
| 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey(
batch_size=_FILE_TRIGGERING_RECORD_COUNT,
- max_buffering_duration_secs=self.triggering_frequency,
+
max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS,
clock=clock)
| 'FromHashableTableRefAndDropShard' >> beam.Map(
lambda kvs: