This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch revert-28592-expose_numshards
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 65bf0e2616e78ba6ffe43748d7cd5a1614c79653
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Sep 22 09:13:13 2023 -0400

    Revert "[Python BQ] Allow setting a fixed number of Storage API streams 
(#28592)"
    
    This reverts commit 04a26da777ff4c0ed9112f07bf0f41a39bc7260d.
---
 ...Commit_Python_CrossLanguage_Gcp_Dataflow.groovy |  2 +-
 ...stCommit_Python_CrossLanguage_Gcp_Direct.groovy |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java | 37 +++++----------------
 .../io/external/xlang_bigqueryio_it_test.py        | 38 ++++++++--------------
 sdks/python/apache_beam/io/gcp/bigquery.py         |  9 -----
 .../documentation/io/built-in/google-bigquery.md   |  2 +-
 6 files changed, 24 insertions(+), 66 deletions(-)

diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
index 1280fcb4e23..d1ee27088c7 100644
--- 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
+++ 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Dataflow.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
     'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow 
(\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
-      description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')
+      description('Runs end-to-end cross language GCP IO tests on the Dataflow 
runner.')
 
 
       // Set common parameters.
diff --git 
a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy 
b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
index e4bf771be1a..438b735fba7 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_CrossLanguage_Gcp_Direct.groovy
@@ -28,7 +28,7 @@ import static 
PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
 // Collects tests with the @pytest.mark.uses_gcp_java_expansion_service 
decorator
 PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
     'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run 
Python_Xlang_Gcp_Direct PostCommit\")', this) {
-      description('Runs end-to-end cross language GCP IO tests on the Direct 
runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')
+      description('Runs end-to-end cross language GCP IO tests on the Direct 
runner.')
 
       // Set common parameters.
       commonJobProperties.setTopLevelMainJobProperties(delegate)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index c3eed246723..e4461793011 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -176,13 +176,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
             !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
             invalidConfigMessage + "Output must not be empty if error handling 
specified.");
       }
-
-      if (this.getAutoSharding() != null && this.getAutoSharding()) {
-        checkArgument(
-            this.getNumStreams() == 0,
-            invalidConfigMessage
-                + "Cannot set a fixed number of streams when auto-sharding is 
enabled. Please pick only one of the two options.");
-      }
     }
 
     /**
@@ -225,17 +218,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
     public abstract Boolean getUseAtLeastOnceSemantics();
 
     @SchemaFieldDescription(
-        "This option enables using a dynamically determined number of Storage 
Write API streams to write to "
+        "This option enables using a dynamically determined number of shards 
to write to "
             + "BigQuery. Only applicable to unbounded data.")
     @Nullable
     public abstract Boolean getAutoSharding();
 
-    @SchemaFieldDescription(
-        "If set, the Storage API sink will default to using this number of 
write streams. " +
-            "Only applicable to unbounded data.")
-    @Nullable
-    public abstract Integer getNumStreams();
-
     @SchemaFieldDescription("This option specifies whether and where to output 
unwritable rows.")
     @Nullable
     public abstract ErrorHandling getErrorHandling();
@@ -256,8 +243,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 
       public abstract Builder setAutoSharding(Boolean autoSharding);
 
-      public abstract Builder setNumStreams(Integer numStreams);
-
       public abstract Builder setErrorHandling(ErrorHandling errorHandling);
 
       /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} 
instance. */
@@ -336,19 +321,13 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
       if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
         Long triggeringFrequency = 
configuration.getTriggeringFrequencySeconds();
         Boolean autoSharding = configuration.getAutoSharding();
-        Integer numStreams = configuration.getNumStreams();
-        // Triggering frequency is only applicable for exactly-once
-        if (!configuration.getUseAtLeastOnceSemantics()) {
-          write =
-              write.withTriggeringFrequency(
-                  (triggeringFrequency == null || triggeringFrequency <= 0)
-                      ? DEFAULT_TRIGGERING_FREQUENCY
-                      : Duration.standardSeconds(triggeringFrequency));
-        }
-        // set num streams if specified, otherwise default to autoSharding
-        if (numStreams > 0) {
-          write = write.withNumStorageWriteApiStreams(numStreams);
-        } else if (autoSharding == null || autoSharding) {
+        write =
+            write.withTriggeringFrequency(
+                (triggeringFrequency == null || triggeringFrequency <= 0)
+                    ? DEFAULT_TRIGGERING_FREQUENCY
+                    : Duration.standardSeconds(triggeringFrequency));
+        // use default value true for autoSharding if not configured for 
STORAGE_WRITE_API
+        if (autoSharding == null || autoSharding) {
           write = write.withAutoSharding();
         }
       }
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index e234aab7314..fbfde550ea7 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -30,13 +30,12 @@ import pytest
 from hamcrest.core import assert_that as hamcrest_assert
 
 import apache_beam as beam
+from apache_beam.io.external.generate_sequence import GenerateSequence
 from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
 from apache_beam.io.gcp.tests.bigquery_matcher import 
BigqueryFullResultStreamingMatcher
-from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.periodicsequence import PeriodicImpulse
 from apache_beam.utils.timestamp import Timestamp
 
 # Protect against environments where bigquery library is not available.
@@ -100,13 +99,11 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
   ALL_TYPES_SCHEMA = (
       "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
       "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
-  _RUNNER = ""
 
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
     self.args = self.test_pipeline.get_full_options_as_args()
     self.project = self.test_pipeline.get_option('project')
-    _RUNNER = PipelineOptions(self.args).get_all_options()['runner']
 
     self.bigquery_client = BigQueryWrapper()
     self.dataset_id = '%s_%s_%s' % (
@@ -247,7 +244,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
               table=table_id, expansion_service=self.expansion_service))
     hamcrest_assert(p, bq_matcher)
 
-  def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
+  def run_streaming(
+      self, table_name, auto_sharding=False, use_at_least_once=False):
     elements = self.ELEMENTS.copy()
     schema = self.ALL_TYPES_SCHEMA
     table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
@@ -262,43 +260,33 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
         streaming=True,
         allow_unsafe_triggers=True)
 
-    auto_sharding = (num_streams == 0)
     with beam.Pipeline(argv=args) as p:
       _ = (
           p
-          | PeriodicImpulse(0, 4, 1)
-          | beam.Map(lambda t: elements[t])
+          | GenerateSequence(
+              start=0, stop=4, expansion_service=self.expansion_service)
+          | beam.Map(lambda x: elements[x])
           | beam.io.WriteToBigQuery(
               table=table_id,
               method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
               schema=schema,
-              triggering_frequency=1,
               with_auto_sharding=auto_sharding,
-              num_storage_api_streams=num_streams,
               use_at_least_once=use_at_least_once,
               expansion_service=self.expansion_service))
     hamcrest_assert(p, bq_matcher)
 
-  @unittest.skipUnless(
-      "dataflowrunner" in _RUNNER.lower(),
-      "The exactly-once route has the requirement "
-      "`beam:requirement:pardo:on_window_expiration:v1`, "
-      "which is currently only supported by the Dataflow runner.")
-  def test_streaming_with_fixed_num_streams(self):
-    table = 'streaming_fixed_num_streams'
-    self.run_streaming(table_name=table, num_streams=4)
-
-  @unittest.skip(
-      "Streaming to the Storage Write API sink with autosharding is broken "
-      "with Dataflow Runner V2.")
-  def test_streaming_with_auto_sharding(self):
-    table = 'streaming_with_auto_sharding'
+  def test_streaming(self):
+    table = 'streaming'
     self.run_streaming(table_name=table)
 
   def test_streaming_with_at_least_once(self):
-    table = 'streaming_with_at_least_once'
+    table = 'streaming'
     self.run_streaming(table_name=table, use_at_least_once=True)
 
+  def test_streaming_with_auto_sharding(self):
+    table = 'streaming_with_auto_sharding'
+    self.run_streaming(table_name=table, auto_sharding=True)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 986919fd6b8..e092ad069ad 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1869,7 +1869,6 @@ class WriteToBigQuery(PTransform):
       # TODO(https://github.com/apache/beam/issues/20712): Switch the default
       # when the feature is mature.
       with_auto_sharding=False,
-      num_storage_api_streams=0,
       ignore_unknown_columns=False,
       load_job_project_id=None,
       max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
@@ -2019,9 +2018,6 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
         determined number of shards to write to BigQuery. This can be used for
         all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
         applicable to unbounded input.
-      num_storage_api_streams: Specifies the number of write streams that the
-        Storage API sink will use. This parameter is only applicable when
-        writing unbounded data.
       ignore_unknown_columns: Accept rows that contain values that do not match
         the schema. The unknown values are ignored. Default is False,
         which treats unknown values as errors. This option is only valid for
@@ -2064,7 +2060,6 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     self.use_at_least_once = use_at_least_once
     self.expansion_service = expansion_service
     self.with_auto_sharding = with_auto_sharding
-    self._num_storage_api_streams = num_storage_api_streams
     self.insert_retry_strategy = insert_retry_strategy
     self._validate = validate
     self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
@@ -2264,7 +2259,6 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
               triggering_frequency=triggering_frequency,
               use_at_least_once=self.use_at_least_once,
               with_auto_sharding=self.with_auto_sharding,
-              num_storage_api_streams=self._num_storage_api_streams,
               expansion_service=self.expansion_service))
 
       if is_rows:
@@ -2527,7 +2521,6 @@ class StorageWriteToBigQuery(PTransform):
       triggering_frequency=0,
       use_at_least_once=False,
       with_auto_sharding=False,
-      num_storage_api_streams=0,
       expansion_service=None):
     """Initialize a StorageWriteToBigQuery transform.
 
@@ -2565,7 +2558,6 @@ class StorageWriteToBigQuery(PTransform):
     self._triggering_frequency = triggering_frequency
     self._use_at_least_once = use_at_least_once
     self._with_auto_sharding = with_auto_sharding
-    self._num_storage_api_streams = num_storage_api_streams
     self._expansion_service = (
         expansion_service or _default_io_expansion_service())
     self.schematransform_config = SchemaAwareExternalTransform.discover_config(
@@ -2577,7 +2569,6 @@ class StorageWriteToBigQuery(PTransform):
         expansion_service=self._expansion_service,
         rearrange_based_on_discovery=True,
         autoSharding=self._with_auto_sharding,
-        numStreams=self._num_storage_api_streams,
         createDisposition=self._create_disposition,
         table=self._table,
         triggeringFrequencySeconds=self._triggering_frequency,
diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 7a31b63a3c9..eae98b84d2c 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -788,7 +788,7 @@ BigQuery Storage Write API for Python SDK currently has 
some limitations on supp
 {{< paragraph class="language-py" >}}
 **Note:** If you want to run WriteToBigQuery with Storage Write API from the 
source code, you need to run `./gradlew 
:sdks:java:io:google-cloud-platform:expansion-service:build` to build the 
expansion-service jar. If you are running from a released Beam SDK, the jar 
will already be included.
 
-**Note:** Auto sharding is not currently supported for Python's Storage Write 
API exactly-once mode on DataflowRunner.
+**Note:** Auto sharding is not currently supported for Python's Storage Write 
API.
 
 {{< /paragraph >}}
 

Reply via email to