TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r936025661
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2716,23 +2726,53 @@ def __init__(
if isinstance(gcs_location, str):
gcs_location = StaticValueProvider(str, gcs_location)
+ if self.output_type == 'BEAM_ROW' and self._kwargs.get('query',
+ None) is not None:
+ raise ValueError(
+ "Both a query and an output type of 'BEAM_ROW' were specified. "
+ "'BEAM_ROW' is not currently supported with queries.")
+
self.gcs_location = gcs_location
self.bigquery_dataset_labels = {
'type': 'bq_direct_read_' + str(uuid.uuid4())[0:10]
}
- self._args = args
- self._kwargs = kwargs
def expand(self, pcoll):
if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
+ output_pcollection = self._expand_export(pcoll)
+ return ReadFromBigQuery._expand_output_type(self, output_pcollection)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
+ output_pcollection = self._expand_direct_read(pcoll)
+ return ReadFromBigQuery._expand_output_type(self, output_pcollection)
Review Comment:
nit, you could just do this call once, like:
```py
if self.method is ReadFromBigQuery.Method.EXPORT:
output_pcollection = self._expand_export(pcoll)
elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
output_pcollection = self._expand_direct_read(pcoll)
else:
raise ValueError(
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
return self._expand_output_type(output_pcollection)
```
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest.mock
+
+import mock
+import numpy as np
+
+import apache_beam.io.gcp.bigquery
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options import value_provider
+
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+ def test_produce_pcoll_with_schema(self):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(
+ usertype.__annotations__,
+ {
+ 'stn': typing.Optional[str],
+ 'temp': typing.Sequence[np.float64],
+ 'count': np.int64
+ })
+
+ def test_produce_pcoll_with_empty_schema(self):
+ fields = []
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(usertype.__annotations__, {})
+
+ def test_unsupported_type(self):
+ fields = [
+ bigquery.TableFieldSchema(
+ name='number', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)
+
+ def test_unsupported_mode(self):
+ fields = [
+ bigquery.TableFieldSchema(name='number', type='INTEGER',
mode="NESTED"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported mode: 'NESTED'"):
+ bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_bad_schema_public_api_export(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery.\
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ p = apache_beam.Pipeline()
+ pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ table="dataset.sample_table",
+ method="EXPORT",
+ project="project",
+ output_type='BEAM_ROW')
+ pipeline
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_bad_schema_public_api_direct_read(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery. \
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ p = apache_beam.Pipeline()
+ pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ table="dataset.sample_table",
+ method="DIRECT_READ",
+ project="project",
+ output_type='BEAM_ROW')
+ pipeline
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_unsupported_value_provider(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(
+ name='count', type='INTEGER', mode="NULLABLE")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery. \
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(TypeError,
+ 'ReadFromBigQuery: table must be of type
string'
+ '; got %r instead' %
Review Comment:
nit: Can we just put the string for the type right in the literal?
```suggestion
'; got ValueProvider instead'
```
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.portability.api import schema_pb2
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ #TODO(https://github.com/apache/beam/issues/20810):
+ # Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
Review Comment:
```suggestion
def generate_user_type_from_bq_schema(the_table_schema):
```
nit: this name is confusing, since it's actually creating a user type (that
can be used as a PCollection's element_type), not a PCollection.
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest.mock
+
+import mock
+import numpy as np
+
+import apache_beam.io.gcp.bigquery
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options import value_provider
+
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+ def test_produce_pcoll_with_schema(self):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(
+ usertype.__annotations__,
+ {
+ 'stn': typing.Optional[str],
+ 'temp': typing.Sequence[np.float64],
+ 'count': np.int64
+ })
+
+ def test_produce_pcoll_with_empty_schema(self):
+ fields = []
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(usertype.__annotations__, {})
+
+ def test_unsupported_type(self):
+ fields = [
+ bigquery.TableFieldSchema(
+ name='number', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)
+
+ def test_unsupported_mode(self):
+ fields = [
+ bigquery.TableFieldSchema(name='number', type='INTEGER',
mode="NESTED"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported mode: 'NESTED'"):
+ bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_bad_schema_public_api_export(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery.\
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ p = apache_beam.Pipeline()
+ pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ table="dataset.sample_table",
+ method="EXPORT",
+ project="project",
+ output_type='BEAM_ROW')
+ pipeline
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_bad_schema_public_api_direct_read(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery. \
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ p = apache_beam.Pipeline()
+ pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ table="dataset.sample_table",
+ method="DIRECT_READ",
+ project="project",
+ output_type='BEAM_ROW')
+ pipeline
+
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_unsupported_value_provider(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(
+ name='count', type='INTEGER', mode="NULLABLE")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery. \
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
Review Comment:
```suggestion
def test_unsupported_value_provider(self):
```
I don't think we need the mock here since we'll fail before getting to that
point (same goes for the other tests checking error messages)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]