[
https://issues.apache.org/jira/browse/BEAM-11587?focusedWorklogId=772089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772089
]
ASF GitHub Bot logged work on BEAM-11587:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/May/22 19:09
Start Date: 18/May/22 19:09
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r876239026
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+ def test_produce_pcoll_with_schema(self):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(
+ usertype.__annotations__,
+ {
+ 'stn': typing.Optional[str],
+ 'temp': typing.Sequence[np.float64],
+ 'count': np.int64
+ })
+
+ def test_produce_pcoll_with_empty_schema(self):
+ fields = []
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(usertype.__annotations__, {})
+
+ def test_error_at_runtime(self):
+ fields = [
+ bigquery.TableFieldSchema(
+ name='number', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaises(ValueError):
Review Comment:
Could you use `assertRaisesRegex` and make sure the message matches the one
you've defined
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "TIMESTAMP": beam.utils.timestamp.Timestamp,
+ #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+ #type: (bigquery.TableSchema) -> type
+
+ """Convert a schema of type TableSchema into a pcollection element.
+ Args:
+ the_table_schema: A BQ schema of type TableSchema
+ Returns:
+ type: type that can be used to work with pCollections.
+ """
+
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ if the_schema == {}:
+ raise ValueError("The schema is empty")
+ dict_of_tuples = []
+ for i in range(len(the_schema['fields'])):
+ if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+ typ = bq_field_to_type(
+ the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+ else:
+ raise ValueError(the_schema['fields'][i]['type'])
+ # TODO svetaksundhar@: Map remaining BQ types
+ dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+ sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+ usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+ return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+ if mode == 'NULLABLE':
+ return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'REPEATED':
+ return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'None' or mode == '':
+ return BIG_QUERY_TO_PYTHON_TYPES[field]
+ else:
+ return ValueError("Not a supported mode")
Review Comment:
Please make this reference the unsupported mode
```suggestion
return ValueError(f"Encountered an unsupported mode: {mode!r}")
```
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2422,6 +2423,9 @@ class ReadFromBigQuery(PTransform):
to run queries with INTERACTIVE priority. This option is ignored when
reading from a table rather than a query. To learn more about query
priority, see: https://cloud.google.com/bigquery/docs/running-queries
+ output_type (str): By default, the schema returned from this transform
+ would be of type TableSchema. Other schema types can be specified
+ ("BEAM_SCHEMAS").
Review Comment:
This argument should be referring to the type of the elements in the output
PCollection. The status quo (what should become the default) is to produce
Python `dict`s, let's call this `"PYTHON_DICT"`, and make that default rather
than `None`.
For this new feature, I think we should call it `"BEAM_ROW"` to standardize
with other terminology across Beam (a *Row* is an instance/element, whose type
is described by a *Schema*)
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "TIMESTAMP": beam.utils.timestamp.Timestamp,
+ #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+ #type: (bigquery.TableSchema) -> type
+
+ """Convert a schema of type TableSchema into a pcollection element.
+ Args:
+ the_table_schema: A BQ schema of type TableSchema
+ Returns:
+ type: type that can be used to work with pCollections.
+ """
+
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ if the_schema == {}:
+ raise ValueError("The schema is empty")
+ dict_of_tuples = []
+ for i in range(len(the_schema['fields'])):
+ if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+ typ = bq_field_to_type(
+ the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+ else:
+ raise ValueError(the_schema['fields'][i]['type'])
+ # TODO svetaksundhar@: Map remaining BQ types
+ dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+ sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+ usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+ return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+ if mode == 'NULLABLE':
+ return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'REPEATED':
+ return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'None' or mode == '':
+ return BIG_QUERY_TO_PYTHON_TYPES[field]
+ else:
+ return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):
+ def __init__(self, pcoll_val_ctor):
+ self._pcoll_val_ctor = pcoll_val_ctor
+
+ def infer_output_type(self, input_type):
+ return self._pcoll_val_ctor
+
+ @classmethod
+ def _from_serialized_schema(cls, dict_of_tuples):
+ return cls(
+ beam.typehints.schemas.named_tuple_from_schema(
+ beam.dataframe.schemas.proto_utils.parse_Bytes(
Review Comment:
Oh I think this is re-importing the one that was imported in
`beam.dataframe.schemas`? and does the same thing below for schema_pb2.
Please update this to reference the underlying packages directly rather than
going through `beam.dataframe`
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2434,10 +2438,12 @@ def __init__(
gcs_location=None,
method=None,
use_native_datetime=False,
+ output_type=None,
*args,
**kwargs):
self.method = method or ReadFromBigQuery.Method.EXPORT
self.use_native_datetime = use_native_datetime
+ self.output_type = output_type
Review Comment:
It looks like this isn't actually used anywhere. We should inspect this
variable when the transform is expanded, and if it is `"BEAM_ROW"`, we should
use your new utilities to add a ParDo at the end that converts the dictionaries
to the generated user type. Does that make sense?
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+ def test_produce_pcoll_with_schema(self):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(
+ usertype.__annotations__,
+ {
+ 'stn': typing.Optional[str],
+ 'temp': typing.Sequence[np.float64],
+ 'count': np.int64
+ })
+
+ def test_produce_pcoll_with_empty_schema(self):
+ fields = []
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(usertype.__annotations__, {})
+
+ def test_error_at_runtime(self):
+ fields = [
+ bigquery.TableFieldSchema(
+ name='number', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ with self.assertRaises(ValueError):
+ bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)
Review Comment:
Would you be able to test this through the public API instead (or in
addition to this one)? Something like:
```
with self.assertRaises:
ReadFromBigQuery(..., output_type=schema)
```
This would require mocking the bigquery client so it returns the bad schema.
I thought I could find lots of examples of this but there don't actually seem
to be any... the best I can find is this:
https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/io/gcp/bigquery_test.py#L468-L472
I think it should be possible to use a pattern like that but there might be
some gotchas I'm missing.
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "TIMESTAMP": beam.utils.timestamp.Timestamp,
+ #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+ #type: (bigquery.TableSchema) -> type
+
+ """Convert a schema of type TableSchema into a pcollection element.
+ Args:
+ the_table_schema: A BQ schema of type TableSchema
+ Returns:
+ type: type that can be used to work with pCollections.
+ """
+
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ if the_schema == {}:
+ raise ValueError("The schema is empty")
+ dict_of_tuples = []
+ for i in range(len(the_schema['fields'])):
+ if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+ typ = bq_field_to_type(
+ the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+ else:
+ raise ValueError(the_schema['fields'][i]['type'])
+ # TODO svetaksundhar@: Map remaining BQ types
+ dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+ sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+ usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+ return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+ if mode == 'NULLABLE':
+ return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'REPEATED':
+ return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'None' or mode == '':
+ return BIG_QUERY_TO_PYTHON_TYPES[field]
+ else:
+ return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):
+ def __init__(self, pcoll_val_ctor):
+ self._pcoll_val_ctor = pcoll_val_ctor
+
+ def infer_output_type(self, input_type):
+ return self._pcoll_val_ctor
+
+ @classmethod
+ def _from_serialized_schema(cls, dict_of_tuples):
+ return cls(
+ beam.typehints.schemas.named_tuple_from_schema(
+ beam.dataframe.schemas.proto_utils.parse_Bytes(
Review Comment:
This is the wrong package name for this function, it lives here:
https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/utils/proto_utils.py#L101
(`beam.utils.proto_utils`)
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+#
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+ "STRING": str,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "TIMESTAMP": beam.utils.timestamp.Timestamp,
+ #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+ #type: (bigquery.TableSchema) -> type
+
+ """Convert a schema of type TableSchema into a pcollection element.
+ Args:
+ the_table_schema: A BQ schema of type TableSchema
+ Returns:
+ type: type that can be used to work with pCollections.
+ """
+
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ if the_schema == {}:
+ raise ValueError("The schema is empty")
+ dict_of_tuples = []
+ for i in range(len(the_schema['fields'])):
+ if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+ typ = bq_field_to_type(
+ the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+ else:
+ raise ValueError(the_schema['fields'][i]['type'])
+ # TODO svetaksundhar@: Map remaining BQ types
+ dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+ sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+ usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+ return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+ if mode == 'NULLABLE':
+ return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'REPEATED':
+ return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+ elif mode == 'None' or mode == '':
+ return BIG_QUERY_TO_PYTHON_TYPES[field]
+ else:
+ return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):
Review Comment:
I think this DoFn should have a `process` method that converts a dictionary
input to an instance of `pcoll_val_ctor` (what you were doing in the lambda
before).
Also a small nit: this shouldn't be doing any unbatching, so I'd just call
it `ConvertToBeamSChemaDoFn` (or similar).
##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+ def test_produce_pcoll_with_schema(self):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(
+ usertype.__annotations__,
+ {
+ 'stn': typing.Optional[str],
+ 'temp': typing.Sequence[np.float64],
+ 'count': np.int64
+ })
+
+ def test_produce_pcoll_with_empty_schema(self):
+ fields = []
+ schema = bigquery.TableSchema(fields=fields)
+
+ usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+ the_table_schema=schema)
+ self.assertEqual(usertype.__annotations__, {})
+
+ def test_error_at_runtime(self):
Review Comment:
Please also add a test for an unsupported type, and make sure that that
raises a helpful error (I think right now it will just get a `KeyError` when
accessing `BIG_QUERY_TO_PYTHON_TYPES`)
##########
sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py:
##########
@@ -178,6 +178,30 @@ def test_iobase_source(self):
query=query, use_standard_sql=True, project=self.project))
assert_that(result, equal_to(self.TABLE_DATA))
+ @pytest.mark.it_postcommit
+ def test_table_schema_retrieve(self):
+ the_table =
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+ project_id="apache-beam-testing",
+ dataset_id="beam_bigquery_io_test",
+ table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
+ table = the_table.schema
+ utype = beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(table)
+ with beam.Pipeline(argv=self.args) as p:
+ result = (
+ p | beam.io.gcp.bigquery.ReadFromBigQuery(
+ gcs_location="gs://bqio_schema",
+ table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0",
+ project="apache-beam-testing")
+ | beam.io.gcp.bigquery.ReadFromBigQuery.get_pcoll_from_schema(table))
Review Comment:
When the previous comment is addressed, we should be able to test this like
so:
```suggestion
p | beam.io.gcp.bigquery.ReadFromBigQuery(
gcs_location="gs://bqio_schema",
table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0",
project="apache-beam-testing",
output_type="BEAM_ROW")
```
Issue Time Tracking
-------------------
Worklog Id: (was: 772089)
Time Spent: 9h 50m (was: 9h 40m)
> Support pd.read_gbq and DataFrame.to_gbq
> ----------------------------------------
>
> Key: BEAM-11587
> URL: https://issues.apache.org/jira/browse/BEAM-11587
> Project: Beam
> Issue Type: New Feature
> Components: dsl-dataframe, io-py-gcp, sdk-py-core
> Reporter: Brian Hulette
> Assignee: Svetak Vihaan Sundhar
> Priority: P3
> Labels: dataframe-api
> Time Spent: 9h 50m
> Remaining Estimate: 0h
>
> We should support
> [read_gbq|https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html]
> andÂ
> [to_gbq|https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_gbq.html]
> in the DataFrame API when gcp extras are installed.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)