TheNeuralBit commented on a change in pull request #17159:
URL: https://github.com/apache/beam/pull/17159#discussion_r839048090
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
+ def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):
Review comment:
Since this doesn't actually use `self`, I think it would be a good idea
to pull out this logic into a helper function.
[bigquery_tools.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery_tools.py)
could be a good place, or we might define a new file for it, like
`bigquery_schema_tools.py` (it looks like there's precedent for this, see
`bigquery_avro_tools.py`). Would you have a preference @pabloem?
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
+ def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ i = 0
+ dict_of_tuples = []
+ for x in the_schema['fields']:
+ if the_schema['fields'][i]['type'] == 'STRING':
+ typ = str
+ elif the_schema['fields'][i]['type'] == 'INTEGER':
+ typ = np.int64
+ elif the_schema['fields'][i]['type'] == 'FLOAT':
+ typ = np.float64
+ elif the_schema['fields'][i]['type'] == 'NUMERIC':
+ typ = np.float128
+ elif the_schema['fields'][i]['type'] == 'BIGNUMERIC':
+ typ = np.float128
Review comment:
I think we should hold off defining mappings for these, np.float128
isn't a perfect mapping since it's floating point and these are fixed point
types.
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -2393,6 +2395,40 @@ def expand(self, pcoll):
'The method to read from BigQuery must be either EXPORT'
'or DIRECT_READ.')
+ def produce_pcoll_with_schema(self, project_id, dataset_id, table_id):
+ the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper(
+ ).get_table(project_id, dataset_id, table_id)
+ the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
+ the_table_schema)
+ i = 0
+ dict_of_tuples = []
+ for x in the_schema['fields']:
+ if the_schema['fields'][i]['type'] == 'STRING':
+ typ = str
+ elif the_schema['fields'][i]['type'] == 'INTEGER':
+ typ = np.int64
+ elif the_schema['fields'][i]['type'] == 'FLOAT':
+ typ = np.float64
+ elif the_schema['fields'][i]['type'] == 'NUMERIC':
+ typ = np.float128
+ elif the_schema['fields'][i]['type'] == 'BIGNUMERIC':
+ typ = np.float128
+ elif the_schema['fields'][i]['type'] == 'BOOL':
+ typ = bool
+ elif the_schema['fields'][i]['type'] == 'BYTES':
+ typ = bytes
+ elif the_schema['fields'][i]['type'] == 'TIMESTAMP':
+ typ = beam.utils.timestamp.Timestamp
+ else:
+ raise ValueError(the_schema['fields'][i]['type'])
+ #TODO svetaksundhar@: Map remaining BQ types
+ dict_of_tuples.append((the_schema['fields'][i]['name'], typ))
Review comment:
I think we also need to look at whether these fields are
nullable/required (if nullable we should wrap the type in `Optional[T]`), and
whether the fields are repeated (then we should wrap them in `Sequence[T]`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]