chamikaramj commented on a change in pull request #13170:
URL: https://github.com/apache/beam/pull/13170#discussion_r528056191
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,264 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ """Starts the process of reading from BigQuery.
+
+ This transform will start a BigQuery export job, and output a number of
+ file sources that are consumed downstream.
+ """
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None,
+ temp_dataset: Union[str, DatasetReference] = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.temp_dataset = temp_dataset or 'bq_read_all_%s' % uuid.uuid4().hex
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'use_json_exports': str(self.use_json_exports),
+ 'gcs_location': str(self.gcs_location),
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'kms_key': str(self.kms_key),
+ 'project': str(self.project),
+ 'temp_dataset': str(self.temp_dataset)
+ }
+
+ def _get_temp_dataset(self):
+ if isinstance(self.temp_dataset, str):
+ return DatasetReference(
+ datasetId=self.temp_dataset, projectId=self._get_project())
+ else:
+ return self.temp_dataset
+
+ def process(self, element: 'ReadFromBigQueryRequest', *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=self._get_temp_dataset().datasetId)
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+
+ for metadata in metadata_list:
+ yield self._create_source(metadata.path, schema)
+
+ if element.query is not None:
+ bq._delete_table(
Review comment:
We should also cleanup the temporary datasets that are created by the
pipeline (may be at the end of every window).
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
##########
@@ -100,3 +127,258 @@ def process(self, unused_element, unused_signal,
gcs_locations):
)
return main_output
+
+
+class _BigQueryReadSplit(beam.transforms.DoFn):
+ def __init__(
+ self,
+ options: PipelineOptions,
+ gcs_location: Union[str, ValueProvider] = None,
+ use_json_exports: bool = False,
+ bigquery_job_labels: Dict[str, str] = None,
+ step_name: str = None,
+ job_name: str = None,
+ unique_id: str = None,
+ kms_key: str = None,
+ project: str = None,
+ temp_dataset: Union[str, DatasetReference] = None):
+ self.options = options
+ self.use_json_exports = use_json_exports
+ self.gcs_location = gcs_location
+ self.bigquery_job_labels = bigquery_job_labels or {}
+ self._step_name = step_name
+ self._job_name = job_name or 'BQ_READ_SPLIT'
+ self._source_uuid = unique_id
+ self.kms_key = kms_key
+ self.project = project
+ self.temp_dataset = temp_dataset
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'use_json_exports': str(self.use_json_exports),
+ 'gcs_location': str(self.gcs_location),
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
+ 'kms_key': str(self.kms_key),
+ 'project': str(self.project),
+ 'temp_dataset': str(self.temp_dataset)
+ }
+
+ def _get_temp_dataset(self):
+ if isinstance(self.temp_dataset, str):
+ return DatasetReference(
+ datasetId=self.temp_dataset, projectId=self._get_project())
+ else:
+ return self.temp_dataset
+
+ def process(self, element: 'ReadFromBigQueryRequest', *args,
+ **kwargs) -> Iterable[BoundedSource]:
+ bq = bigquery_tools.BigQueryWrapper(
+ temp_dataset_id=(
+ self._get_temp_dataset().datasetId if self._get_temp_dataset(
+ ) else None))
+
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, element)
+ table_reference = self._execute_query(bq, element)
+ else:
+ assert element.table
+ table_reference = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+
+ if not table_reference.projectId:
+ table_reference.projectId = self._get_project()
+
+ schema, metadata_list = self._export_files(bq, element, table_reference)
+
+ for metadata in metadata_list:
+ yield self._create_source(metadata.path, schema)
+
+ if element.query is not None:
+ bq.clean_up_temporary_dataset(self._get_project())
+
+ def _get_bq_metadata(self):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ return self.bq_io_metadata
+
+ def _create_source(self, path, schema):
+ if not self.use_json_exports:
+ return _create_avro_source(path, use_fastavro=True)
+ else:
+ return _TextSource(
+ path,
+ min_bundle_size=0,
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ strip_trailing_newlines=True,
+ coder=_JsonToDictCoder(schema))
+
+ def _setup_temporary_dataset(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ location = bq.get_query_location(
+ self._get_project(), element.query, not element.use_standard_sql)
+ bq.create_temporary_dataset(self._get_project(), location)
+
+ def _execute_query(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest'):
+ query_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.QUERY,
+ random.randint(0, 1000))
+ job = bq._start_query_job(
+ self._get_project(),
+ element.query,
+ not element.use_standard_sql,
+ element.flatten_results,
+ job_id=query_job_name,
+ kms_key=self.kms_key,
+ job_labels=self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels))
+ job_ref = job.jobReference
+ bq.wait_for_bq_job(job_ref, max_retries=0)
+ return bq._get_temp_table(self._get_project())
+
+ def _export_files(
+ self,
+ bq: bigquery_tools.BigQueryWrapper,
+ element: 'ReadFromBigQueryRequest',
+ table_reference: TableReference):
+ """Runs a BigQuery export job.
+
+ Returns:
+ bigquery.TableSchema instance, a list of FileMetadata instances
+ """
+ job_labels = self._get_bq_metadata().add_additional_bq_job_labels(
+ self.bigquery_job_labels)
+ export_job_name = bigquery_tools.generate_bq_job_name(
+ self._job_name,
+ self._source_uuid,
+ bigquery_tools.BigQueryJobTypes.EXPORT,
+ element.obj_id)
+ temp_location = self.options.view_as(GoogleCloudOptions).temp_location
+ gcs_location = bigquery_export_destination_uri(
+ self.gcs_location,
+ temp_location,
+ '%s%s' % (self._source_uuid, element.obj_id))
+ if self.use_json_exports:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.JSON,
+ project=self._get_project(),
+ job_labels=job_labels,
+ include_header=False)
+ else:
+ job_ref = bq.perform_extract_job([gcs_location],
+ export_job_name,
+ table_reference,
+ bigquery_tools.FileFormat.AVRO,
+ project=self._get_project(),
+ include_header=False,
+ job_labels=job_labels,
+ use_avro_logical_types=True)
+ bq.wait_for_bq_job(job_ref)
+ metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+ if isinstance(table_reference, ValueProvider):
+ table_ref = bigquery_tools.parse_table_reference(
+ element.table, project=self._get_project())
+ else:
+ table_ref = table_reference
+ table = bq.get_table(
+ table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+
+ return table.schema, metadata_list
+
+ def _get_project(self):
+ """Returns the project that queries and exports will be billed to."""
+
+ project = self.options.view_as(GoogleCloudOptions).project
+ if isinstance(project, ValueProvider):
+ project = project.get()
+ if not project:
+ project = self.project
+ return project
+
+
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+class _JsonToDictCoder(coders.Coder):
Review comment:
Ah ok. Thanks.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
##########
@@ -298,6 +298,109 @@ def test_iobase_source(self):
assert_that(result, equal_to(self.get_expected_data(native=False)))
+class ReadAllBQTests(BigQueryReadIntegrationTests):
+ TABLE_DATA_1 = [{
+ 'number': 1, 'str': 'abc'
+ }, {
+ 'number': 2, 'str': 'def'
+ }, {
+ 'number': 3, 'str': u'你好'
+ }, {
+ 'number': 4, 'str': u'привет'
+ }]
+
+ TABLE_DATA_2 = [{
+ 'number': 10, 'str': 'abcd'
+ }, {
+ 'number': 20, 'str': 'defg'
+ }, {
+ 'number': 30, 'str': u'你好'
+ }, {
+ 'number': 40, 'str': u'привет'
+ }]
+
+ TABLE_DATA_3 = [{'number': 10, 'str': 'abcde', 'extra': 3}]
+
+ @classmethod
+ def setUpClass(cls):
+ super(ReadAllBQTests, cls).setUpClass()
+ cls.SCHEMA_BQ = cls.create_bq_schema()
+ cls.SCHEMA_BQ_WITH_EXTRA = cls.create_bq_schema(True)
+
+ cls.table_name1 = 'python_rd_table_1'
+ cls.table_schema1 = cls.create_table(
+ cls.table_name1, cls.TABLE_DATA_1, cls.SCHEMA_BQ)
+ table_id1 = '{}.{}'.format(cls.dataset_id, cls.table_name1)
+ cls.query1 = 'SELECT number, str FROM `%s`' % table_id1
+
+ cls.table_name2 = 'python_rd_table_2'
+ cls.table_schema2 = cls.create_table(
+ cls.table_name2, cls.TABLE_DATA_2, cls.SCHEMA_BQ)
+ table_id2 = '{}.{}'.format(cls.dataset_id, cls.table_name2)
+ cls.query2 = 'SELECT number, str FROM %s' % table_id2
+
+ cls.table_name3 = 'python_rd_table_3'
+ cls.table_schema3 = cls.create_table(
+ cls.table_name3, cls.TABLE_DATA_3, cls.SCHEMA_BQ_WITH_EXTRA)
+ table_id3 = '{}.{}'.format(cls.dataset_id, cls.table_name3)
+ cls.query3 = 'SELECT number, str, extra FROM `%s`' % table_id3
+
+ @classmethod
+ def create_table(cls, table_name, data, table_schema):
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=cls.project, datasetId=cls.dataset_id,
+ tableId=table_name),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=cls.project, datasetId=cls.dataset_id, table=table)
+ cls.bigquery_client.client.tables.Insert(request)
+ cls.bigquery_client.insert_rows(
+ cls.project, cls.dataset_id, table_name, data)
+ return table_schema
+
+ @classmethod
+ def create_bq_schema(cls, with_extra=False):
+ table_schema = bigquery.TableSchema()
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'number'
+ table_field.type = 'INTEGER'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'str'
+ table_field.type = 'STRING'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ if with_extra:
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'extra'
+ table_field.type = 'INTEGER'
+ table_field.mode = 'NULLABLE'
+ table_schema.fields.append(table_field)
+ return table_schema
+
+ @skip(['PortableRunner', 'FlinkRunner'])
+ @attr('IT')
+ def test_read_queries(self):
+ # TODO(BEAM-11311): Remove experiment when tests run on r_v2.
+ args = self.args + ["--experiments=use_runner_v2"]
+ with beam.Pipeline(argv=args) as p:
+ result = (
+ p
+ | beam.Create([
+ beam.io.ReadFromBigQueryRequest(query=self.query1),
+ beam.io.ReadFromBigQueryRequest(
+ query=self.query2, use_standard_sql=False),
Review comment:
I see. Thanks. No this is good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]