[ 
https://issues.apache.org/jira/browse/BEAM-11587?focusedWorklogId=772089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772089
 ]

ASF GitHub Bot logged work on BEAM-11587:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/22 19:09
            Start Date: 18/May/22 19:09
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r876239026


##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+  def test_produce_pcoll_with_schema(self):
+    fields = [
+        bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+        bigquery.TableFieldSchema(name='temp', type='FLOAT64', 
mode="REPEATED"),
+        bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(
+        usertype.__annotations__,
+        {
+            'stn': typing.Optional[str],
+            'temp': typing.Sequence[np.float64],
+            'count': np.int64
+        })
+
+  def test_produce_pcoll_with_empty_schema(self):
+    fields = []
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(usertype.__annotations__, {})
+
+  def test_error_at_runtime(self):
+    fields = [
+        bigquery.TableFieldSchema(
+            name='number', type='DOUBLE', mode="NULLABLE"),
+        bigquery.TableFieldSchema(name='temp', type='FLOAT64', 
mode="REPEATED"),
+        bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+    with self.assertRaises(ValueError):

Review Comment:
   Could you use `assertRaisesRegex` and make sure the message matches the one 
you've defined



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+# 
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+    "STRING": str,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "TIMESTAMP": beam.utils.timestamp.Timestamp,
+    #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+  #type: (bigquery.TableSchema) -> type
+
+  """Convert a schema of type TableSchema into a pcollection element.
+      Args:
+        the_table_schema: A BQ schema of type TableSchema
+      Returns:
+        type: type that can be used to work with pCollections.
+  """
+
+  the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+      the_table_schema)
+  if the_schema == {}:
+    raise ValueError("The schema is empty")
+  dict_of_tuples = []
+  for i in range(len(the_schema['fields'])):
+    if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+      typ = bq_field_to_type(
+          the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+    else:
+      raise ValueError(the_schema['fields'][i]['type'])
+    # TODO svetaksundhar@: Map remaining BQ types
+    dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+  sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+  usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+  return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+  the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+  ).get_table(project_id, dataset_id, table_id)
+  beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+  if mode == 'NULLABLE':
+    return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'REPEATED':
+    return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'None' or mode == '':
+    return BIG_QUERY_TO_PYTHON_TYPES[field]
+  else:
+    return ValueError("Not a supported mode")

Review Comment:
   Please make this reference the unsupported mode
   ```suggestion
       return ValueError(f"Encountered an unsupported mode: {mode!r}")
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2422,6 +2423,9 @@ class ReadFromBigQuery(PTransform):
       to run queries with INTERACTIVE priority. This option is ignored when
       reading from a table rather than a query. To learn more about query
       priority, see: https://cloud.google.com/bigquery/docs/running-queries
+    output_type (str): By default, the schema returned from this transform
+    would be of type TableSchema. Other schema types can be specified
+    ("BEAM_SCHEMAS").

Review Comment:
   This argument should be referring to the type of the elements in the output 
PCollection. The status quo (what should become the default) is to produce 
Python `dict`s, let's call this `"PYTHON_DICT"`, and make that default rather 
than `None`.
   
   For this new feature, I think we should call it `"BEAM_ROW"` to standardize 
with other terminology across Beam (a *Row* is an instance/element, whose type 
is described by a *Schema*)



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+# 
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+    "STRING": str,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "TIMESTAMP": beam.utils.timestamp.Timestamp,
+    #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+  #type: (bigquery.TableSchema) -> type
+
+  """Convert a schema of type TableSchema into a pcollection element.
+      Args:
+        the_table_schema: A BQ schema of type TableSchema
+      Returns:
+        type: type that can be used to work with pCollections.
+  """
+
+  the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+      the_table_schema)
+  if the_schema == {}:
+    raise ValueError("The schema is empty")
+  dict_of_tuples = []
+  for i in range(len(the_schema['fields'])):
+    if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+      typ = bq_field_to_type(
+          the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+    else:
+      raise ValueError(the_schema['fields'][i]['type'])
+    # TODO svetaksundhar@: Map remaining BQ types
+    dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+  sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+  usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+  return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+  the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+  ).get_table(project_id, dataset_id, table_id)
+  beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+  if mode == 'NULLABLE':
+    return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'REPEATED':
+    return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'None' or mode == '':
+    return BIG_QUERY_TO_PYTHON_TYPES[field]
+  else:
+    return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):
+  def __init__(self, pcoll_val_ctor):
+    self._pcoll_val_ctor = pcoll_val_ctor
+
+  def infer_output_type(self, input_type):
+    return self._pcoll_val_ctor
+
+  @classmethod
+  def _from_serialized_schema(cls, dict_of_tuples):
+    return cls(
+        beam.typehints.schemas.named_tuple_from_schema(
+            beam.dataframe.schemas.proto_utils.parse_Bytes(

Review Comment:
   Oh I think this is re-importing the one that was imported in 
`beam.dataframe.schemas`? and does the same thing below for schema_pb2.
   
   Please update this to reference the underlying packages directly rather than 
going through `beam.dataframe`



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2434,10 +2438,12 @@ def __init__(
       gcs_location=None,
       method=None,
       use_native_datetime=False,
+      output_type=None,
       *args,
       **kwargs):
     self.method = method or ReadFromBigQuery.Method.EXPORT
     self.use_native_datetime = use_native_datetime
+    self.output_type = output_type

Review Comment:
   It looks like this isn't actually used anywhere. We should inspect this 
variable when the transform is expanded, and if it is `"BEAM_ROW"`, we should 
use your new utilities to add a ParDo at the end that converts the dictionaries 
to the generated user type. Does that make sense?



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+  def test_produce_pcoll_with_schema(self):
+    fields = [
+        bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+        bigquery.TableFieldSchema(name='temp', type='FLOAT64', 
mode="REPEATED"),
+        bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(
+        usertype.__annotations__,
+        {
+            'stn': typing.Optional[str],
+            'temp': typing.Sequence[np.float64],
+            'count': np.int64
+        })
+
+  def test_produce_pcoll_with_empty_schema(self):
+    fields = []
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(usertype.__annotations__, {})
+
+  def test_error_at_runtime(self):
+    fields = [
+        bigquery.TableFieldSchema(
+            name='number', type='DOUBLE', mode="NULLABLE"),
+        bigquery.TableFieldSchema(name='temp', type='FLOAT64', 
mode="REPEATED"),
+        bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+    with self.assertRaises(ValueError):
+      bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema)

Review Comment:
   Would you be able to test this through the public API instead (or in 
addition to this one)? Something like:
   ```
   with self.assertRaises:
     ReadFromBigQuery(..., output_type=schema)
   ```
   
   This would require mocking the bigquery client so it returns the bad schema. 
I thought I could find lots of examples of this but there don't actually seem 
to be any... the best I can find is this:
   
https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/io/gcp/bigquery_test.py#L468-L472
   
   I think it should be possible to use a pattern like that but there might be 
some gotchas I'm missing.



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+# 
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+    "STRING": str,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "TIMESTAMP": beam.utils.timestamp.Timestamp,
+    #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+  #type: (bigquery.TableSchema) -> type
+
+  """Convert a schema of type TableSchema into a pcollection element.
+      Args:
+        the_table_schema: A BQ schema of type TableSchema
+      Returns:
+        type: type that can be used to work with pCollections.
+  """
+
+  the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+      the_table_schema)
+  if the_schema == {}:
+    raise ValueError("The schema is empty")
+  dict_of_tuples = []
+  for i in range(len(the_schema['fields'])):
+    if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+      typ = bq_field_to_type(
+          the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+    else:
+      raise ValueError(the_schema['fields'][i]['type'])
+    # TODO svetaksundhar@: Map remaining BQ types
+    dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+  sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+  usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+  return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+  the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+  ).get_table(project_id, dataset_id, table_id)
+  beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+  if mode == 'NULLABLE':
+    return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'REPEATED':
+    return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'None' or mode == '':
+    return BIG_QUERY_TO_PYTHON_TYPES[field]
+  else:
+    return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):
+  def __init__(self, pcoll_val_ctor):
+    self._pcoll_val_ctor = pcoll_val_ctor
+
+  def infer_output_type(self, input_type):
+    return self._pcoll_val_ctor
+
+  @classmethod
+  def _from_serialized_schema(cls, dict_of_tuples):
+    return cls(
+        beam.typehints.schemas.named_tuple_from_schema(
+            beam.dataframe.schemas.proto_utils.parse_Bytes(

Review Comment:
   This is the wrong package name for this function, it lives here: 
https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/utils/proto_utils.py#L101
   
   (`beam.utils.proto_utils`)



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py:
##########
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tools used tool work with Schema types in the context of BigQuery.
+Classes, constants and functions in this file are experimental and have no
+backwards compatibility guarantees.
+NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
+"""
+
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.io.gcp.internal.clients import bigquery
+
+# BigQuery types as listed in
+# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
+# 
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String-
+BIG_QUERY_TO_PYTHON_TYPES = {
+    "STRING": str,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "TIMESTAMP": beam.utils.timestamp.Timestamp,
+    #TODO svetaksundhar@: Finish mappings for all BQ types
+}
+
+
+def produce_pcoll_with_schema(the_table_schema):
+  #type: (bigquery.TableSchema) -> type
+
+  """Convert a schema of type TableSchema into a pcollection element.
+      Args:
+        the_table_schema: A BQ schema of type TableSchema
+      Returns:
+        type: type that can be used to work with pCollections.
+  """
+
+  the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+      the_table_schema)
+  if the_schema == {}:
+    raise ValueError("The schema is empty")
+  dict_of_tuples = []
+  for i in range(len(the_schema['fields'])):
+    if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES:
+      typ = bq_field_to_type(
+          the_schema['fields'][i]['type'], the_schema['fields'][i]['mode'])
+    else:
+      raise ValueError(the_schema['fields'][i]['type'])
+    # TODO svetaksundhar@: Map remaining BQ types
+    dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
+  sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples)
+  usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema)
+  return usertype
+
+
+def produce_pcoll_using_bqio(project_id, dataset_id, table_id):
+  the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+  ).get_table(project_id, dataset_id, table_id)
+  beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema)
+
+
+def bq_field_to_type(field, mode):
+  if mode == 'NULLABLE':
+    return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'REPEATED':
+    return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
+  elif mode == 'None' or mode == '':
+    return BIG_QUERY_TO_PYTHON_TYPES[field]
+  else:
+    return ValueError("Not a supported mode")
+
+
+class BeamSchemaUnbatchDoFn(beam.DoFn):

Review Comment:
   I think this DoFn should have a `process` method that converts a dictionary 
input to an instance of `pcoll_val_ctor` (what you were doing in the lambda 
before).
   
   Also a small nit: this shouldn't be doing any unbatching, so I'd just call 
it `ConvertToBeamSChemaDoFn` (or similar).



##########
sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py:
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import typing
+import unittest
+
+import numpy as np
+
+from apache_beam.io.gcp import bigquery_schema_tools
+from apache_beam.io.gcp.bigquery_test import HttpError
+from apache_beam.io.gcp.internal.clients import bigquery
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestBigQueryToSchema(unittest.TestCase):
+  def test_produce_pcoll_with_schema(self):
+    fields = [
+        bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
+        bigquery.TableFieldSchema(name='temp', type='FLOAT64', 
mode="REPEATED"),
+        bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None")
+    ]
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(
+        usertype.__annotations__,
+        {
+            'stn': typing.Optional[str],
+            'temp': typing.Sequence[np.float64],
+            'count': np.int64
+        })
+
+  def test_produce_pcoll_with_empty_schema(self):
+    fields = []
+    schema = bigquery.TableSchema(fields=fields)
+
+    usertype = bigquery_schema_tools.produce_pcoll_with_schema(
+        the_table_schema=schema)
+    self.assertEqual(usertype.__annotations__, {})
+
+  def test_error_at_runtime(self):

Review Comment:
   Please also add a test for an unsupported type, and make sure that that 
raises a helpful error (I think right now it will just get a `KeyError` when 
accessing `BIG_QUERY_TO_PYTHON_TYPES`)



##########
sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py:
##########
@@ -178,6 +178,30 @@ def test_iobase_source(self):
               query=query, use_standard_sql=True, project=self.project))
       assert_that(result, equal_to(self.TABLE_DATA))
 
+  @pytest.mark.it_postcommit
+  def test_table_schema_retrieve(self):
+    the_table = 
beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table(
+        project_id="apache-beam-testing",
+        dataset_id="beam_bigquery_io_test",
+        table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
+    table = the_table.schema
+    utype = beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(table)
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | beam.io.gcp.bigquery.ReadFromBigQuery(
+              gcs_location="gs://bqio_schema",
+              table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0",
+              project="apache-beam-testing")
+          | beam.io.gcp.bigquery.ReadFromBigQuery.get_pcoll_from_schema(table))

Review Comment:
   When the previous comment is addressed, we should be able to test this like 
so:
   ```suggestion
             p | beam.io.gcp.bigquery.ReadFromBigQuery(
                 gcs_location="gs://bqio_schema",
                 table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0",
                 project="apache-beam-testing",
                 output_type="BEAM_ROW")
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772089)
    Time Spent: 9h 50m  (was: 9h 40m)

> Support pd.read_gbq and DataFrame.to_gbq
> ----------------------------------------
>
>                 Key: BEAM-11587
>                 URL: https://issues.apache.org/jira/browse/BEAM-11587
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-dataframe, io-py-gcp, sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Svetak Vihaan Sundhar
>            Priority: P3
>              Labels: dataframe-api
>          Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> We should support 
> [read_gbq|https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html]
>  and 
> [to_gbq|https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_gbq.html]
>  in the DataFrame API when gcp extras are installed.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to