TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r927153356
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2459,14 +2463,35 @@ def __init__(
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
+ self.output_type = output_type
self._args = args
self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ if self.output_type == 'BEAM_ROWS':
+ return output_pcollection | ReadFromBigQuery._convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ else:
+ return output_pcollection
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ if self.output_type == 'BEAM_ROWS':
+ return output_pcollection | ReadFromBigQuery._convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ else:
+ return output_pcollection
Review Comment:
Could we move this logic outside the if, instead of duplicating it?
Something like this:
```
if self.method is ReadFromBigQuery.Method.EXPORT:
output_pcollection = self._expand_export(pcoll)
if self.output_type == 'BEAM_ROWS':
return output_pcollection | ReadFromBigQuery._convert_to_usertype(
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
project_id=output_pcollection.pipeline.options.view_as(
GoogleCloudOptions).project,
dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
table_id=str(self._kwargs['table']).rsplit(
'.', maxsplit=1)[-1]).schema)
else:
return output_pcollection
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
output_pcollection = self._expand_direct_read(pcoll)
if self.output_type == 'BEAM_ROWS':
return output_pcollection | ReadFromBigQuery._convert_to_usertype(
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
project_id=output_pcollection.pipeline.options.view_as(
GoogleCloudOptions).project,
dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
table_id=str(self._kwargs['table']).rsplit(
'.', maxsplit=1)[-1]).schema)
else:
return output_pcollection
```
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2520,33 +2545,39 @@ def _expand_direct_read(self, pcoll):
else:
project_id = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
- def _get_pipeline_details(unused_elm):
- pipeline_details = {}
- if temp_table_ref is not None:
- pipeline_details['temp_table_ref'] = temp_table_ref
- elif project_id is not None:
- pipeline_details['project_id'] = project_id
- pipeline_details[
- 'bigquery_dataset_labels'] = self.bigquery_dataset_labels
- return pipeline_details
-
- project_to_cleanup_pcoll = beam.pvalue.AsList(
- pcoll.pipeline
- | 'ProjectToCleanupImpulse' >> beam.Create([None])
- | 'MapProjectToCleanup' >> beam.Map(_get_pipeline_details))
-
- return (
- pcoll
- | beam.io.Read(
- _CustomBigQueryStorageSource(
- pipeline_options=pcoll.pipeline.options,
- method=self.method,
- use_native_datetime=self.use_native_datetime,
- temp_table=temp_table_ref,
- bigquery_dataset_labels=self.bigquery_dataset_labels,
- *self._args,
- **self._kwargs))
- | _PassThroughThenCleanupTempDatasets(project_to_cleanup_pcoll))
+ def _get_pipeline_details(unused_elm):
+ pipeline_details = {}
+ if temp_table_ref is not None:
+ pipeline_details['temp_table_ref'] = temp_table_ref
+ elif project_id is not None:
+ pipeline_details['project_id'] = project_id
+ pipeline_details[
+ 'bigquery_dataset_labels'] = self.bigquery_dataset_labels
+ return pipeline_details
+
+ project_to_cleanup_pcoll = beam.pvalue.AsList(
+ pcoll.pipeline
+ | 'ProjectToCleanupImpulse' >> beam.Create([None])
+ | 'MapProjectToCleanup' >> beam.Map(_get_pipeline_details))
+
+ return (
+ pcoll
+ | beam.io.Read(
+ _CustomBigQueryStorageSource(
+ pipeline_options=pcoll.pipeline.options,
+ method=self.method,
+ use_native_datetime=self.use_native_datetime,
+ temp_table=temp_table_ref,
+ bigquery_dataset_labels=self.bigquery_dataset_labels,
+ *self._args,
+ **self._kwargs))
+ | _PassThroughThenCleanupTempDatasets(project_to_cleanup_pcoll))
Review Comment:
Did you intend to indent this? It doesn't look like it changed otherwise.
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest.mock
+
+import mock
+import numpy as np
+
+import apache_beam.io.gcp.bigquery
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
Review Comment:
Please import this dependency directly instead of re-importing it through
bigquery_test. It's ok to just duplicate this logic:
https://github.com/apache/beam/blob/f2f239a44f490f4ca811361473754d07bc98b6c6/sdks/python/apache_beam/io/gcp/bigquery_test.py#L81-L91
That will make the skipIf below much more clear.
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.portability.api import schema_pb2
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "TIMESTAMP": beam.utils.timestamp.Timestamp,
+ #TODO svetaksundhar@: Finish mappings for all BQ types
Review Comment:
```suggestion
#TODO(https://github.com/apache/beam/issues/20810): Finish mappings for
all BQ types
```
Linking an issue is preferable, we can just re-use #20810 for now.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2520,33 +2545,39 @@ def _expand_direct_read(self, pcoll):
else:
project_id = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
- def _get_pipeline_details(unused_elm):
- pipeline_details = {}
- if temp_table_ref is not None:
- pipeline_details['temp_table_ref'] = temp_table_ref
- elif project_id is not None:
- pipeline_details['project_id'] = project_id
- pipeline_details[
- 'bigquery_dataset_labels'] = self.bigquery_dataset_labels
- return pipeline_details
-
- project_to_cleanup_pcoll = beam.pvalue.AsList(
- pcoll.pipeline
- | 'ProjectToCleanupImpulse' >> beam.Create([None])
- | 'MapProjectToCleanup' >> beam.Map(_get_pipeline_details))
-
- return (
- pcoll
- | beam.io.Read(
- _CustomBigQueryStorageSource(
- pipeline_options=pcoll.pipeline.options,
- method=self.method,
- use_native_datetime=self.use_native_datetime,
- temp_table=temp_table_ref,
- bigquery_dataset_labels=self.bigquery_dataset_labels,
- *self._args,
- **self._kwargs))
- | _PassThroughThenCleanupTempDatasets(project_to_cleanup_pcoll))
+ def _get_pipeline_details(unused_elm):
+ pipeline_details = {}
+ if temp_table_ref is not None:
+ pipeline_details['temp_table_ref'] = temp_table_ref
+ elif project_id is not None:
+ pipeline_details['project_id'] = project_id
+ pipeline_details[
+ 'bigquery_dataset_labels'] = self.bigquery_dataset_labels
+ return pipeline_details
+
+ project_to_cleanup_pcoll = beam.pvalue.AsList(
+ pcoll.pipeline
+ | 'ProjectToCleanupImpulse' >> beam.Create([None])
+ | 'MapProjectToCleanup' >> beam.Map(_get_pipeline_details))
+
+ return (
+ pcoll
+ | beam.io.Read(
+ _CustomBigQueryStorageSource(
+ pipeline_options=pcoll.pipeline.options,
+ method=self.method,
+ use_native_datetime=self.use_native_datetime,
+ temp_table=temp_table_ref,
+ bigquery_dataset_labels=self.bigquery_dataset_labels,
+ *self._args,
+ **self._kwargs))
+ | _PassThroughThenCleanupTempDatasets(project_to_cleanup_pcoll))
+
+ def _convert_to_usertype(table_schema):
+ usertype = beam.io.gcp.bigquery_schema_tools. \
+ produce_pcoll_with_schema(table_schema)
+ return beam.ParDo(
+ beam.io.gcp.bigquery_schema_tools.BeamSchemaConversionDoFn(usertype))
Review Comment:
nit: I think this helper belongs in `bigquery_schema_tools` as well
##########
sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py:
##########
@@ -178,6 +180,30 @@ def test_iobase_source(self):
query=query, use_standard_sql=True, project=self.project))
assert_that(result, equal_to(self.TABLE_DATA))
+ @pytest.mark.it_postcommit
+ def test_table_schema_retrieve(self):
+ the_table = bigquery_tools.BigQueryWrapper().get_table(
+ project_id="apache-beam-testing",
+ dataset_id="beam_bigquery_io_test",
+ table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
+ table = the_table.schema
+ utype = bigquery_schema_tools.produce_pcoll_with_schema(table)
+ with beam.Pipeline(argv=self.args) as p:
+ result = (
+ p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ gcs_location="gs://bqio_schema_test",
+ table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0",
+ project="apache-beam-testing",
+ output_type='BEAM_ROWS'))
+ assert_that(
+ result,
+ equal_to([
+ utype(id=3, name='customer1', type='test'),
+ utype(id=1, name='customer1', type='test'),
+ utype(id=2, name='customer2', type='test'),
+ utype(id=4, name='customer2', type='test')
+ ]))
Review Comment:
Could you also test this with DIRECT_READ mode?
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest.mock
+
+import mock
+import numpy as np
+
+import apache_beam.io.gcp.bigquery
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.internal.clients.bigquery import Table
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
Review Comment:
It's not strictly necessary for this PR, but I think it would helpful to
have a test here that makes a `BeamSchemaConversionDoFn` for various schemas
and pushes data through it. That will give us a quick way to test support for
new types.
I think it's fine to hold off on that for a later PR though, if you want.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2422,6 +2422,9 @@ class ReadFromBigQuery(PTransform):
to run queries with INTERACTIVE priority. This option is ignored when
reading from a table rather than a query. To learn more about query
priority, see: https://cloud.google.com/bigquery/docs/running-queries
+ output_type (str): By default, the schema returned from this transform
+ would be of type PYTHON_DICT. Other schema types can be specified
+ ("BEAM_ROW").
Review Comment:
Technically the default is not to produce a schema. Also I think we should
be clear that this is still in-progress, given the limited type support. I'd
phrase it something like this:
```suggestion
output_type (str): By default, this source yields Python dictionaries
(`PYTHON_DICT`). There is experimental support for producing a PCollection with
a schema and yielding Beam Rows via the option `BEAM_ROW`.
```
It might be nice to put a link to the schema docs
(https://beam.apache.org/documentation/programming-guide/#what-is-a-schema) in
there too.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2459,14 +2463,35 @@ def __init__(
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
+ self.output_type = output_type
self._args = args
self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ if self.output_type == 'BEAM_ROWS':
+ return output_pcollection | ReadFromBigQuery._convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ else:
+ return output_pcollection
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ if self.output_type == 'BEAM_ROWS':
+ return output_pcollection | ReadFromBigQuery._convert_to_usertype(
+ beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id=output_pcollection.pipeline.options.view_as(
+ GoogleCloudOptions).project,
+ dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
+ table_id=str(self._kwargs['table']).rsplit(
+ '.', maxsplit=1)[-1]).schema)
+ else:
+ return output_pcollection
Review Comment:
Could we move this logic outside the if, instead of duplicating it?
Something like this:
```py
if self.method is ReadFromBigQuery.Method.EXPORT:
output_pcollection = self._expand_export(pcoll)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
output_pcollection = self._expand_direct_read(pcoll)
if self.output_type == 'BEAM_ROWS':
return output_pcollection | ReadFromBigQuery._convert_to_usertype(
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
project_id=output_pcollection.pipeline.options.view_as(
GoogleCloudOptions).project,
dataset_id=str(self._kwargs['table']).split('.',
maxsplit=1)[0],
table_id=str(self._kwargs['table']).rsplit(
'.', maxsplit=1)[-1]).schema)
elif output_type == 'PYTHON_DICT':
return output_collection
else:
raise ...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]