chamikaramj commented on a change in pull request #13170:
URL: https://github.com/apache/beam/pull/13170#discussion_r527931340
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1931,3 +1903,132 @@ def file_path_to_remove(unused_elm):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))
+
+
+class ReadFromBigQueryRequest:
+ """
+ Class that defines data to read from BQ.
+ """
+ def __init__(
+ self,
+ query: str = None,
+ use_standard_sql: bool = True,
Review comment:
What about other args here ?
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1823
(probably this can be a followup PR)
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -79,6 +79,41 @@
`ReadFromBigQuery`, you can use the flag `use_json_exports` to export
data as JSON, and receive base64-encoded bytes.
+ReadAllFromBigQuery
+-------------------
+Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which
+allows you to define table and query reads from BigQuery at pipeline
+runtime.:::
+
+ read_requests = p | beam.Create([
+ ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
+ ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
+ results = read_requests | ReadAllFromBigQuery()
+
+A good application for this transform is in streaming pipelines to
+refresh a side input coming from BigQuery. This would work like so:::
+
+ side_input = (
+ p
+ | 'PeriodicImpulse' >> PeriodicImpulse(
+ first_timestamp, last_timestamp, interval, True)
+ | 'MapToReadRequest' >> beam.Map(
+ lambda x: ReadFromBigQueryRequest(table='dataset.table'))
+ | beam.io.ReadAllFromBigQuery())
+ main_input = (
+ p
+ | 'MpImpulse' >> beam.Create(sample_main_input_elements)
+ |
+ 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
+ | 'WindowMpInto' >> beam.WindowInto(
+ window.FixedWindows(main_input_windowing_interval)))
+ result = (
+ main_input
+ | 'ApplyCrossJoin' >> beam.FlatMap(
+ cross_join, rights=beam.pvalue.AsIter(side_input)))
+
+**Note**: This transform is supported on Portable runners only.
Review comment:
Why ? Also is this supported for Dataflow Runnner v2 ?
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,258 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None,
+ temp_dataset: Union[str, DatasetReference] = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.temp_dataset = temp_dataset
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'use_json_exports': str(self.use_json_exports),
+ 'gcs_location': str(self.gcs_location),
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'kms_key': str(self.kms_key),
+ 'project': str(self.project),
+ 'temp_dataset': str(self.temp_dataset)
+ }
+
+ def _get_temp_dataset(self):
+ if isinstance(self.temp_dataset, str):
+ return DatasetReference(
+ datasetId=self.temp_dataset, projectId=self._get_project())
+ else:
+ return self.temp_dataset
+
+ def process(self, element: 'ReadFromBigQueryRequest', *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=(
+ self._get_temp_dataset().datasetId if self._get_temp_dataset(
Review comment:
Will we run into BQ quotas if we try to create a large number of
datasets ?
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,258 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None,
+ temp_dataset: Union[str, DatasetReference] = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.temp_dataset = temp_dataset
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'use_json_exports': str(self.use_json_exports),
+ 'gcs_location': str(self.gcs_location),
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'kms_key': str(self.kms_key),
+ 'project': str(self.project),
+ 'temp_dataset': str(self.temp_dataset)
+ }
+
+ def _get_temp_dataset(self):
+ if isinstance(self.temp_dataset, str):
+ return DatasetReference(
+ datasetId=self.temp_dataset, projectId=self._get_project())
+ else:
+ return self.temp_dataset
+
+ def process(self, element: 'ReadFromBigQueryRequest', *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=(
+ self._get_temp_dataset().datasetId if self._get_temp_dataset(
+ ) else None))
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+
+ for metadata in metadata_list:
+ yield self._create_source(metadata.path, schema)
+
+ if element.query is not None:
+ bq.clean_up_temporary_dataset(self._get_project())
+
+ def _get_bq_metadata(self):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ return self.bq_io_metadata
+
+ def _create_source(self, path, schema):
+ if not self.use_json_exports:
+ return _create_avro_source(path, use_fastavro=True)
+ else:
+ return _TextSource(
+ path,
+ min_bundle_size=0,
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ strip_trailing_newlines=True,
+ coder=_JsonToDictCoder(schema))
+
+ def _setup_temporary_dataset(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ location = bq.get_query_location(
+ self._get_project(), element.query, not element.use_standard_sql)
+ bq.create_temporary_dataset(self._get_project(), location)
+
+ def _execute_query(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ query_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.QUERY,
+ random.randint(0, 1000))
+ job = bq._start_query_job(
+ self._get_project(),
+ element.query,
+ not element.use_standard_sql,
+ element.flatten_results,
+ job_id=query_job_name,
+ kms_key=self.kms_key,
+ job_labels=self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels))
+ job_ref = job.jobReference
+ bq.wait_for_bq_job(job_ref, max_retries=0)
+ return bq._get_temp_table(self._get_project())
+
+ def _export_files(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest',
+ table_reference: TableReference):
+ """Runs a BigQuery export job.
+
+ Returns:
+ bigquery.TableSchema instance, a list of FileMetadata instances
+ """
+ job_labels = self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels)
+ export_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.EXPORT,
+ element.obj_id)
+ temp_location = self.options.view_as(GoogleCloudOptions).temp_location
+ gcs_location = bigquery_export_destination_uri(
+ self.gcs_location,
+ temp_location,
+ '%s%s' % (self._source_uuid, element.obj_id))
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=job_labels,
+ use_avro_logical_types=True)
+ bq.wait_for_bq_job(job_ref)
+ metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+ if isinstance(table_reference, ValueProvider):
+ table_ref = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+ else:
+ table_ref = table_reference
+ table = bq.get_table(
+ table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+
+ return table.schema, metadata_list
+
+ def _get_project(self):
+ """Returns the project that queries and exports will be billed to."""
+
+ project = self.options.view_as(GoogleCloudOptions).project
+ if isinstance(project, ValueProvider):
+ project = project.get()
+ if not project:
+ project = self.project
+ return project
+
+
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+class _JsonToDictCoder(coders.Coder):
+ """A coder for a JSON string to a Python dict."""
+ def __init__(self, table_schema):
Review comment:
Please add unit tests for the new coder.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,258 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
Review comment:
Add some pydocs here to clarify what this does ?
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
##########
@@ -298,6 +298,109 @@ def test_iobase_source(self):
assert_that(result, equal_to(self.get_expected_data(native=False)))
+class ReadAllBQTests(BigQueryReadIntegrationTests):
+ TABLE_DATA_1 = [{
+ 'number': 1, 'str': 'abc'
+ }, {
+ 'number': 2, 'str': 'def'
+ }, {
+ 'number': 3, 'str': u'你好'
+ }, {
+ 'number': 4, 'str': u'привет'
+ }]
+
+ TABLE_DATA_2 = [{
+ 'number': 10, 'str': 'abcd'
+ }, {
+ 'number': 20, 'str': 'defg'
+ }, {
+ 'number': 30, 'str': u'你好'
+ }, {
+ 'number': 40, 'str': u'привет'
+ }]
+
+ TABLE_DATA_3 = [{'number': 10, 'str': 'abcde', 'extra': 3}]
+
+ @classmethod
+ def setUpClass(cls):
+ super(ReadAllBQTests, cls).setUpClass()
+ cls.SCHEMA_BQ = cls.create_bq_schema()
+ cls.SCHEMA_BQ_WITH_EXTRA = cls.create_bq_schema(True)
+
+ cls.table_name1 = 'python_rd_table_1'
+ cls.table_schema1 = cls.create_table(
+ cls.table_name1, cls.TABLE_DATA_1, cls.SCHEMA_BQ)
+ table_id1 = '{}.{}'.format(cls.dataset_id, cls.table_name1)
+ cls.query1 = 'SELECT number, str FROM `%s`' % table_id1
+
+ cls.table_name2 = 'python_rd_table_2'
+ cls.table_schema2 = cls.create_table(
+ cls.table_name2, cls.TABLE_DATA_2, cls.SCHEMA_BQ)
+ table_id2 = '{}.{}'.format(cls.dataset_id, cls.table_name2)
+ cls.query2 = 'SELECT number, str FROM %s' % table_id2
+
+ cls.table_name3 = 'python_rd_table_3'
+ cls.table_schema3 = cls.create_table(
+ cls.table_name3, cls.TABLE_DATA_3, cls.SCHEMA_BQ_WITH_EXTRA)
+ table_id3 = '{}.{}'.format(cls.dataset_id, cls.table_name3)
+ cls.query3 = 'SELECT number, str, extra FROM `%s`' % table_id3
+
+ @classmethod
+ def create_table(cls, table_name, data, table_schema):
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=cls.project, datasetId=cls.dataset_id,
+ tableId=table_name),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=cls.project, datasetId=cls.dataset_id, table=table)
+ cls.bigquery_client.client.tables.Insert(request)
+ cls.bigquery_client.insert_rows(
+ cls.project, cls.dataset_id, table_name, data)
+ return table_schema
+
+ @classmethod
+ def create_bq_schema(cls, with_extra=False):
+ table_schema = bigquery.TableSchema()
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'number'
+ table_field.type = 'INTEGER'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'str'
+ table_field.type = 'STRING'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ if with_extra:
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'extra'
+ table_field.type = 'INTEGER'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ return table_schema
+
+ @skip(['PortableRunner', 'FlinkRunner'])
+ @attr('IT')
+ def test_read_queries(self):
+ # TODO(BEAM-11311): Remove experiment when tests run on r_v2.
+ args = self.args + ["--experiments=use_runner_v2"]
+ with beam.Pipeline(argv=args) as p:
+ result = (
+ p
+ | beam.Create([
+ beam.io.ReadFromBigQueryRequest(query=self.query1),
+ beam.io.ReadFromBigQueryRequest(
+ query=self.query2, use_standard_sql=False),
Review comment:
We should also add an integration test.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,258 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None,
+ temp_dataset: Union[str, DatasetReference] = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.temp_dataset = temp_dataset
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'use_json_exports': str(self.use_json_exports),
+ 'gcs_location': str(self.gcs_location),
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'kms_key': str(self.kms_key),
+ 'project': str(self.project),
+ 'temp_dataset': str(self.temp_dataset)
+ }
+
+ def _get_temp_dataset(self):
+ if isinstance(self.temp_dataset, str):
+ return DatasetReference(
+ datasetId=self.temp_dataset, projectId=self._get_project())
+ else:
+ return self.temp_dataset
+
+ def process(self, element: 'ReadFromBigQueryRequest', *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=(
+ self._get_temp_dataset().datasetId if self._get_temp_dataset(
+ ) else None))
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+
+ for metadata in metadata_list:
+ yield self._create_source(metadata.path, schema)
+
+ if element.query is not None:
+ bq.clean_up_temporary_dataset(self._get_project())
+
+ def _get_bq_metadata(self):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ return self.bq_io_metadata
+
+ def _create_source(self, path, schema):
+ if not self.use_json_exports:
+ return _create_avro_source(path, use_fastavro=True)
+ else:
+ return _TextSource(
+ path,
+ min_bundle_size=0,
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ strip_trailing_newlines=True,
+ coder=_JsonToDictCoder(schema))
+
+ def _setup_temporary_dataset(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ location = bq.get_query_location(
+ self._get_project(), element.query, not element.use_standard_sql)
+ bq.create_temporary_dataset(self._get_project(), location)
+
+ def _execute_query(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ query_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.QUERY,
+ random.randint(0, 1000))
+ job = bq._start_query_job(
+ self._get_project(),
+ element.query,
+ not element.use_standard_sql,
+ element.flatten_results,
+ job_id=query_job_name,
+ kms_key=self.kms_key,
+ job_labels=self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels))
+ job_ref = job.jobReference
+ bq.wait_for_bq_job(job_ref, max_retries=0)
+ return bq._get_temp_table(self._get_project())
+
+ def _export_files(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest',
+ table_reference: TableReference):
+ """Runs a BigQuery export job.
+
+ Returns:
+ bigquery.TableSchema instance, a list of FileMetadata instances
+ """
+ job_labels = self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels)
+ export_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.EXPORT,
+ element.obj_id)
+ temp_location = self.options.view_as(GoogleCloudOptions).temp_location
+ gcs_location = bigquery_export_destination_uri(
+ self.gcs_location,
+ temp_location,
+ '%s%s' % (self._source_uuid, element.obj_id))
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=job_labels,
+ use_avro_logical_types=True)
+ bq.wait_for_bq_job(job_ref)
+ metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+ if isinstance(table_reference, ValueProvider):
+ table_ref = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+ else:
+ table_ref = table_reference
+ table = bq.get_table(
+ table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+
+ return table.schema, metadata_list
+
+ def _get_project(self):
+ """Returns the project that queries and exports will be billed to."""
+
+ project = self.options.view_as(GoogleCloudOptions).project
+ if isinstance(project, ValueProvider):
+ project = project.get()
+ if not project:
+ project = self.project
+ return project
+
+
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+class _JsonToDictCoder(coders.Coder):
Review comment:
Is there a reason why we couldn't use existing coders ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]