This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch release-2.51.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.51.0 by this push:
new 2420c904f66 Cherry picking PR #28618 into 2.51.0 (setting numShards
for Python BigQuery xlang) (#28631)
2420c904f66 is described below
commit 2420c904f66a8882eb454416f4029604c9160502
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Sep 27 17:43:54 2023 -0400
Cherry picking PR #28618 into 2.51.0 (setting numShards for Python BigQuery
xlang) (#28631)
---
...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy | 2 +-
...stCommit_Python_CrossLanguage_Gcp_Direct.groovy | 2 +-
...ueryStorageWriteApiSchemaTransformProvider.java | 37 ++++++++++++++----
.../io/external/xlang_bigqueryio_it_test.py | 44 ++++++++++++++--------
sdks/python/apache_beam/io/gcp/bigquery.py | 9 +++++
.../documentation/io/built-in/google-bigquery.md | 2 +-
6 files changed, 69 insertions(+), 27 deletions(-)
diff --git
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
index d1ee27088c7..1280fcb4e23 100644
---
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
+++
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
@@ -28,7 +28,7 @@ import static
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service
decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow
(\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
- description('Runs end-to-end cross language GCP IO tests on the Dataflow
runner.')
+ description('Runs end-to-end cross language GCP IO tests on the Dataflow
runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')
// Set common parameters.
diff --git
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
index 438b735fba7..e4bf771be1a 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
@@ -28,7 +28,7 @@ import static
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service
decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run
Python_Xlang_Gcp_Direct PostCommit\")', this) {
- description('Runs end-to-end cross language GCP IO tests on the Direct
runner.')
+ description('Runs end-to-end cross language GCP IO tests on the Direct
runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')
// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index e4461793011..1b9eb309ec4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -176,6 +176,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
!Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
invalidConfigMessage + "Output must not be empty if error handling
specified.");
}
+
+ if (this.getAutoSharding() != null && this.getAutoSharding()) {
+ checkArgument(
+ this.getNumStreams() == 0,
+ invalidConfigMessage
+ + "Cannot set a fixed number of streams when auto-sharding is
enabled. Please pick only one of the two options.");
+ }
}
/**
@@ -218,11 +225,17 @@ public class
BigQueryStorageWriteApiSchemaTransformProvider
public abstract Boolean getUseAtLeastOnceSemantics();
@SchemaFieldDescription(
- "This option enables using a dynamically determined number of shards
to write to "
+ "This option enables using a dynamically determined number of Storage
Write API streams to write to "
+ "BigQuery. Only applicable to unbounded data.")
@Nullable
public abstract Boolean getAutoSharding();
+ @SchemaFieldDescription(
+ "Specifies the number of write streams that the Storage API sink will
use. "
+ + "This parameter is only applicable when writing unbounded data.")
+ @Nullable
+ public abstract Integer getNumStreams();
+
@SchemaFieldDescription("This option specifies whether and where to output
unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();
@@ -243,6 +256,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
public abstract Builder setAutoSharding(Boolean autoSharding);
+ public abstract Builder setNumStreams(Integer numStreams);
+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration}
instance. */
@@ -321,13 +336,19 @@ public class
BigQueryStorageWriteApiSchemaTransformProvider
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency =
configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
- write =
- write.withTriggeringFrequency(
- (triggeringFrequency == null || triggeringFrequency <= 0)
- ? DEFAULT_TRIGGERING_FREQUENCY
- : Duration.standardSeconds(triggeringFrequency));
- // use default value true for autoSharding if not configured for
STORAGE_WRITE_API
- if (autoSharding == null || autoSharding) {
+ Integer numStreams = configuration.getNumStreams();
+ // Triggering frequency is only applicable for exactly-once
+ if (!configuration.getUseAtLeastOnceSemantics()) {
+ write =
+ write.withTriggeringFrequency(
+ (triggeringFrequency == null || triggeringFrequency <= 0)
+ ? DEFAULT_TRIGGERING_FREQUENCY
+ : Duration.standardSeconds(triggeringFrequency));
+ }
+ // set num streams if specified, otherwise default to autoSharding
+ if (numStreams > 0) {
+ write = write.withNumStorageWriteApiStreams(numStreams);
+ } else if (autoSharding == null || autoSharding) {
write = write.withAutoSharding();
}
}
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index fbfde550ea7..5917ca4dc72 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -30,12 +30,13 @@ import pytest
from hamcrest.core import assert_that as hamcrest_assert
import apache_beam as beam
-from apache_beam.io.external.generate_sequence import GenerateSequence
from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import
BigqueryFullResultStreamingMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp
# Protect against environments where bigquery library is not available.
@@ -51,9 +52,9 @@ _LOGGER = logging.getLogger(__name__)
@pytest.mark.uses_gcp_java_expansion_service
[email protected](
- os.environ.get('EXPANSION_PORT'),
- "EXPANSION_PORT environment var is not provided.")
+# @unittest.skipUnless(
+# os.environ.get('EXPANSION_PORT'),
+# "EXPANSION_PORT environment var is not provided.")
class BigQueryXlangStorageWriteIT(unittest.TestCase):
BIGQUERY_DATASET = 'python_xlang_storage_write'
@@ -104,6 +105,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.project = self.test_pipeline.get_option('project')
+ self._runner = PipelineOptions(self.args).get_all_options()['runner']
self.bigquery_client = BigQueryWrapper()
self.dataset_id = '%s_%s_%s' % (
@@ -244,8 +246,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
table=table_id, expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)
- def run_streaming(
- self, table_name, auto_sharding=False, use_at_least_once=False):
+ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
elements = self.ELEMENTS.copy()
schema = self.ALL_TYPES_SCHEMA
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
@@ -260,33 +261,44 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
streaming=True,
allow_unsafe_triggers=True)
+ auto_sharding = (num_streams == 0)
with beam.Pipeline(argv=args) as p:
_ = (
p
- | GenerateSequence(
- start=0, stop=4, expansion_service=self.expansion_service)
- | beam.Map(lambda x: elements[x])
+ | PeriodicImpulse(0, 4, 1)
+ | beam.Map(lambda t: elements[t])
| beam.io.WriteToBigQuery(
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=schema,
+ triggering_frequency=1,
with_auto_sharding=auto_sharding,
+ num_storage_api_streams=num_streams,
use_at_least_once=use_at_least_once,
expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)
- def test_streaming(self):
- table = 'streaming'
+ def test_streaming_with_fixed_num_streams(self):
+ # skip if dataflow runner is not specified
+ if not self._runner or "dataflowrunner" not in self._runner.lower():
+ self.skipTest(
+ "The exactly-once route has the requirement "
+ "`beam:requirement:pardo:on_window_expiration:v1`, "
+ "which is currently only supported by the Dataflow runner")
+ table = 'streaming_fixed_num_streams'
+ self.run_streaming(table_name=table, num_streams=4)
+
+ @unittest.skip(
+ "Streaming to the Storage Write API sink with autosharding is broken "
+ "with Dataflow Runner V2.")
+ def test_streaming_with_auto_sharding(self):
+ table = 'streaming_with_auto_sharding'
self.run_streaming(table_name=table)
def test_streaming_with_at_least_once(self):
- table = 'streaming'
+ table = 'streaming_with_at_least_once'
self.run_streaming(table_name=table, use_at_least_once=True)
- def test_streaming_with_auto_sharding(self):
- table = 'streaming_with_auto_sharding'
- self.run_streaming(table_name=table, auto_sharding=True)
-
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index e092ad069ad..986919fd6b8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1869,6 +1869,7 @@ class WriteToBigQuery(PTransform):
# TODO(https://github.com/apache/beam/issues/20712): Switch the default
# when the feature is mature.
with_auto_sharding=False,
+ num_storage_api_streams=0,
ignore_unknown_columns=False,
load_job_project_id=None,
max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
@@ -2018,6 +2019,9 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
determined number of shards to write to BigQuery. This can be used for
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
+ num_storage_api_streams: Specifies the number of write streams that the
+ Storage API sink will use. This parameter is only applicable when
+ writing unbounded data.
ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
@@ -2060,6 +2064,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
self.use_at_least_once = use_at_least_once
self.expansion_service = expansion_service
self.with_auto_sharding = with_auto_sharding
+ self._num_storage_api_streams = num_storage_api_streams
self.insert_retry_strategy = insert_retry_strategy
self._validate = validate
self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
@@ -2259,6 +2264,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
triggering_frequency=triggering_frequency,
use_at_least_once=self.use_at_least_once,
with_auto_sharding=self.with_auto_sharding,
+ num_storage_api_streams=self._num_storage_api_streams,
expansion_service=self.expansion_service))
if is_rows:
@@ -2521,6 +2527,7 @@ class StorageWriteToBigQuery(PTransform):
triggering_frequency=0,
use_at_least_once=False,
with_auto_sharding=False,
+ num_storage_api_streams=0,
expansion_service=None):
"""Initialize a StorageWriteToBigQuery transform.
@@ -2558,6 +2565,7 @@ class StorageWriteToBigQuery(PTransform):
self._triggering_frequency = triggering_frequency
self._use_at_least_once = use_at_least_once
self._with_auto_sharding = with_auto_sharding
+ self._num_storage_api_streams = num_storage_api_streams
self._expansion_service = (
expansion_service or _default_io_expansion_service())
self.schematransform_config = SchemaAwareExternalTransform.discover_config(
@@ -2569,6 +2577,7 @@ class StorageWriteToBigQuery(PTransform):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
autoSharding=self._with_auto_sharding,
+ numStreams=self._num_storage_api_streams,
createDisposition=self._create_disposition,
table=self._table,
triggeringFrequencySeconds=self._triggering_frequency,
diff --git
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index eae98b84d2c..7a31b63a3c9 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -788,7 +788,7 @@ BigQuery Storage Write API for Python SDK currently has
some limitations on supp
{{< paragraph class="language-py" >}}
**Note:** If you want to run WriteToBigQuery with Storage Write API from the
source code, you need to run `./gradlew
:sdks:java:io:google-cloud-platform:expansion-service:build` to build the
expansion-service jar. If you are running from a released Beam SDK, the jar
will already be included.
-**Note:** Auto sharding is not currently supported for Python's Storage Write
API.
+**Note:** Auto sharding is not currently supported for Python's Storage Write
API exactly-once mode on DataflowRunner.
{{< /paragraph >}}