ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125802640
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API."""
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "failed_rows"
+ FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition="",
+ write_disposition="",
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ available_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
This is meant to be for streaming use cases;
`StorageApiWritesShardedRecords` has an
[`OnWindowExpiration`](https://github.com/apache/beam/blob/4da602517292adcd9ffcd7cc0acb8b0c1155aa02/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L763)
method, which is only supported by DataflowRunner. Trying to run this with
another runner will result in:
```
ValueError: Unable to run pipeline with requirement:
beam:requirement:pardo:on_window_expiration:v1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]