TheNeuralBit commented on code in PR #22616:
URL: https://github.com/apache/beam/pull/22616#discussion_r941878880
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -58,6 +58,17 @@
_DEFAULT_BYTES_CHUNKSIZE = 1 << 20
+@frame_base.with_docs_from(pd)
+def read_gbq(table, *args, **kwargs):
+ """If you would like to use the 'DIRECT_READ' method ins ReadFromBigQuery,
+ please set use_bq_storage_api to True.
+ Otherwise, if you would like to use the 'EXPORT' method, please set
+ use_bq_storage_api to False, or leave it unspecified."""
Review Comment:
I could be swayed, but I think it would be preferable not to document how
this is working under the hood (e.g. that it relies on `ReadFromBigQuery`), and
instead just present the user-facing API.
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -58,6 +58,17 @@
_DEFAULT_BYTES_CHUNKSIZE = 1 << 20
+@frame_base.with_docs_from(pd)
Review Comment:
I'm not sure it makes sense to pull the docs from pandas in this case. We're
diverging from pandas pretty significantly here: there are a lot of arguments
we don't need (e.g. auth related ones), and we prefer a `table` argument rather
than `query`. I'd suggest writing a whole new docstring (possibly drawing
heavily from the pandas one).
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -756,3 +767,81 @@ def expand(self, pcoll):
| beam.Map(lambda file_result:
file_result.file_name).with_output_types(
str)
}
+
+
+class _ReadGbq(beam.PTransform):
+ """Read data from BigQuery with output type 'BEAM_ROW',
+ then convert it into a deferred dataframe.
+
+ This PTransform wraps the Python ReadFromBigQuery PTransform,
+ and sets the output_type as 'BEAM_ROW' to convert
+ into a Beam Schema. Once applied to a pipeline object,
+ it is passed into the to_dataframe() function to convert the
+ PCollection into a deferred dataframe.
+
+ This PTransform currently does not support queries.
+ Note that all Python ReadFromBigQuery args can be passed in
+ to this PTransform, in addition to the args listed below.
+
+ Args:
+ table (str): The ID of the table. The ID must contain only
+ letters ``a-z``, ``A-Z``,
+ numbers ``0-9``, underscores ``_`` or white spaces.
+ Note that the table argument must contain the entire table
+ reference specified as: ``'PROJECT:DATASET.TABLE'``.
+ use_bq_storage_api (bool): The method to use to read from BigQuery.
+ It may be 'EXPORT' or
+ 'DIRECT_READ'. EXPORT invokes a BigQuery export request
+ (https://cloud.google.com/bigquery/docs/exporting-data).
+ 'DIRECT_READ' reads
+ directly from BigQuery storage using the BigQuery Read API
+ (https://cloud.google.com/bigquery/docs/reference/storage). If
+ unspecified or set to false, the default is currently utilized (EXPORT).
+ If the flag is set to true,
+ 'DIRECT_READ' will be utilized."""
+ def __init__(
+ self,
+ table=None,
Review Comment:
I don't see pandas' `project_id` parameter here, I think that's one that it
makes sense to add and plumb through to ReadFromBigQuery. We could also add a
Beam-specific one `dataset`.
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -756,3 +767,81 @@ def expand(self, pcoll):
| beam.Map(lambda file_result:
file_result.file_name).with_output_types(
str)
}
+
+
+class _ReadGbq(beam.PTransform):
+ """Read data from BigQuery with output type 'BEAM_ROW',
+ then convert it into a deferred dataframe.
+
+ This PTransform wraps the Python ReadFromBigQuery PTransform,
+ and sets the output_type as 'BEAM_ROW' to convert
+ into a Beam Schema. Once applied to a pipeline object,
+ it is passed into the to_dataframe() function to convert the
+ PCollection into a deferred dataframe.
+
+ This PTransform currently does not support queries.
+ Note that all Python ReadFromBigQuery args can be passed in
+ to this PTransform, in addition to the args listed below.
+
+ Args:
+ table (str): The ID of the table. The ID must contain only
+ letters ``a-z``, ``A-Z``,
+ numbers ``0-9``, underscores ``_`` or white spaces.
+ Note that the table argument must contain the entire table
+ reference specified as: ``'PROJECT:DATASET.TABLE'``.
+ use_bq_storage_api (bool): The method to use to read from BigQuery.
+ It may be 'EXPORT' or
+ 'DIRECT_READ'. EXPORT invokes a BigQuery export request
+ (https://cloud.google.com/bigquery/docs/exporting-data).
+ 'DIRECT_READ' reads
+ directly from BigQuery storage using the BigQuery Read API
+ (https://cloud.google.com/bigquery/docs/reference/storage). If
+ unspecified or set to false, the default is currently utilized (EXPORT).
+ If the flag is set to true,
+ 'DIRECT_READ' will be utilized."""
+ def __init__(
+ self,
+ table=None,
+ index_col=None,
+ col_order=None,
+ reauth=None,
+ auth_local_webserver=None,
+ dialect=None,
+ location=None,
+ configuration=None,
+ credentials=None,
+ use_bqstorage_api=False,
+ max_results=None,
+ progress_bar_type=None):
+
+ self.table = table
+ self.index_col = index_col
+ self.col_order = col_order
+ self.reauth = reauth
+ self.auth_local_webserver = auth_local_webserver
+ self.dialect = dialect
+ self.location = location
+ self.configuration = configuration
+ self.credentials = credentials
+ self.use_bqstorage_api = use_bqstorage_api
+ self.max_results = max_results
+ self.progress_bar_type = progress_bar_type
+
+ if (self.index_col is not None or self.col_order is not None or
+ self.reauth is not None or self.auth_local_webserver is not None or
+ self.dialect is not None or self.location is not None or
+ self.configuration is not None or self.credentials is not None or
+ self.max_results is not None or self.progress_bar_type) is not None:
+ raise ValueError(
+ "Unsupported parameter entered in ReadGbq. "
+ "Please enter only supported parameters.")
Review Comment:
Instead of checking each unsupported parameter explicitly, could we capture
all of these in `**kwargs`, (so the argument list becomes `(table=None,
use_bqstorage_api=False, **kwargs)`) and then raise this error if kwargs is
non-empty? Then it could also include a listing of the unsupported arguments,
like `auth_local_webserver, progress_bar_type, ...`
##########
sdks/python/apache_beam/dataframe/io_test.py:
##########
@@ -410,5 +419,50 @@ def test_double_write(self):
set(self.read_all_lines(output + 'out2.csv*')))
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class ReadGbqTransformTests(unittest.TestCase):
+ @mock.patch.object(BigQueryWrapper, 'get_table')
+ def test_bad_schema_public_api_direct_read(self, get_table):
+ fields = [
+ bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
+ bigquery.TableFieldSchema(name='temp', type='FLOAT64',
mode="REPEATED"),
+ bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
+ ]
+ schema = bigquery.TableSchema(fields=fields)
+ table = apache_beam.io.gcp.internal.clients.bigquery. \
+ bigquery_v2_messages.Table(
+ schema=schema)
+ get_table.return_value = table
+
+ with self.assertRaisesRegex(ValueError,
+ "Encountered an unsupported type: 'DOUBLE'"):
+ p = apache_beam.Pipeline()
+ pipeline = p | apache_beam.dataframe.io.read_gbq(
+ table="dataset.sample_table", use_bqstorage_api=True)
+ pipeline
Review Comment:
Note the output here should be a deferred dataframe object, it's confusing
to name it `pipeline` (`p` is actually a Pipeline). Regardless, I'd suggest
assigning this to _ so pylint doesn't complain about an unassigned output or an
unused variable:
```suggestion
_ = p | apache_beam.dataframe.io.read_gbq(
table="dataset.sample_table", use_bqstorage_api=True)
```
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -756,3 +767,81 @@ def expand(self, pcoll):
| beam.Map(lambda file_result:
file_result.file_name).with_output_types(
str)
}
+
+
+class _ReadGbq(beam.PTransform):
+ """Read data from BigQuery with output type 'BEAM_ROW',
+ then convert it into a deferred dataframe.
+
+ This PTransform wraps the Python ReadFromBigQuery PTransform,
+ and sets the output_type as 'BEAM_ROW' to convert
+ into a Beam Schema. Once applied to a pipeline object,
+ it is passed into the to_dataframe() function to convert the
+ PCollection into a deferred dataframe.
+
+ This PTransform currently does not support queries.
+ Note that all Python ReadFromBigQuery args can be passed in
+ to this PTransform, in addition to the args listed below.
+
+ Args:
+ table (str): The ID of the table. The ID must contain only
+ letters ``a-z``, ``A-Z``,
+ numbers ``0-9``, underscores ``_`` or white spaces.
+ Note that the table argument must contain the entire table
+ reference specified as: ``'PROJECT:DATASET.TABLE'``.
+ use_bq_storage_api (bool): The method to use to read from BigQuery.
+ It may be 'EXPORT' or
+ 'DIRECT_READ'. EXPORT invokes a BigQuery export request
+ (https://cloud.google.com/bigquery/docs/exporting-data).
+ 'DIRECT_READ' reads
+ directly from BigQuery storage using the BigQuery Read API
+ (https://cloud.google.com/bigquery/docs/reference/storage). If
+ unspecified or set to false, the default is currently utilized (EXPORT).
+ If the flag is set to true,
+ 'DIRECT_READ' will be utilized."""
+ def __init__(
+ self,
+ table=None,
+ index_col=None,
+ col_order=None,
+ reauth=None,
+ auth_local_webserver=None,
+ dialect=None,
+ location=None,
+ configuration=None,
+ credentials=None,
+ use_bqstorage_api=False,
+ max_results=None,
+ progress_bar_type=None):
+
+ self.table = table
+ self.index_col = index_col
+ self.col_order = col_order
+ self.reauth = reauth
+ self.auth_local_webserver = auth_local_webserver
+ self.dialect = dialect
+ self.location = location
+ self.configuration = configuration
+ self.credentials = credentials
+ self.use_bqstorage_api = use_bqstorage_api
+ self.max_results = max_results
+ self.progress_bar_type = progress_bar_type
+
+ if (self.index_col is not None or self.col_order is not None or
+ self.reauth is not None or self.auth_local_webserver is not None or
+ self.dialect is not None or self.location is not None or
+ self.configuration is not None or self.credentials is not None or
+ self.max_results is not None or self.progress_bar_type) is not None:
+ raise ValueError(
+ "Unsupported parameter entered in ReadGbq. "
+ "Please enter only supported parameters.")
Review Comment:
I'd also suggest moving this to read_gbq, and having `_ReadGbq` just accept
supported params.
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -756,3 +767,81 @@ def expand(self, pcoll):
| beam.Map(lambda file_result:
file_result.file_name).with_output_types(
str)
}
+
+
+class _ReadGbq(beam.PTransform):
+ """Read data from BigQuery with output type 'BEAM_ROW',
+ then convert it into a deferred dataframe.
+
+ This PTransform wraps the Python ReadFromBigQuery PTransform,
+ and sets the output_type as 'BEAM_ROW' to convert
+ into a Beam Schema. Once applied to a pipeline object,
+ it is passed into the to_dataframe() function to convert the
+ PCollection into a deferred dataframe.
+
+ This PTransform currently does not support queries.
+ Note that all Python ReadFromBigQuery args can be passed in
+ to this PTransform, in addition to the args listed below.
+
+ Args:
+ table (str): The ID of the table. The ID must contain only
+ letters ``a-z``, ``A-Z``,
+ numbers ``0-9``, underscores ``_`` or white spaces.
+ Note that the table argument must contain the entire table
+ reference specified as: ``'PROJECT:DATASET.TABLE'``.
+ use_bq_storage_api (bool): The method to use to read from BigQuery.
+ It may be 'EXPORT' or
+ 'DIRECT_READ'. EXPORT invokes a BigQuery export request
+ (https://cloud.google.com/bigquery/docs/exporting-data).
+ 'DIRECT_READ' reads
+ directly from BigQuery storage using the BigQuery Read API
+ (https://cloud.google.com/bigquery/docs/reference/storage). If
+ unspecified or set to false, the default is currently utilized (EXPORT).
+ If the flag is set to true,
+ 'DIRECT_READ' will be utilized."""
+ def __init__(
+ self,
+ table=None,
+ index_col=None,
+ col_order=None,
+ reauth=None,
+ auth_local_webserver=None,
+ dialect=None,
+ location=None,
+ configuration=None,
+ credentials=None,
+ use_bqstorage_api=False,
+ max_results=None,
+ progress_bar_type=None):
+
+ self.table = table
+ self.index_col = index_col
+ self.col_order = col_order
+ self.reauth = reauth
+ self.auth_local_webserver = auth_local_webserver
+ self.dialect = dialect
+ self.location = location
+ self.configuration = configuration
+ self.credentials = credentials
+ self.use_bqstorage_api = use_bqstorage_api
+ self.max_results = max_results
+ self.progress_bar_type = progress_bar_type
+
+ if (self.index_col is not None or self.col_order is not None or
+ self.reauth is not None or self.auth_local_webserver is not None or
+ self.dialect is not None or self.location is not None or
+ self.configuration is not None or self.credentials is not None or
+ self.max_results is not None or self.progress_bar_type) is not None:
+ raise ValueError(
+ "Unsupported parameter entered in ReadGbq. "
+ "Please enter only supported parameters.")
+
+ def expand(self, root):
+ from apache_beam.dataframe import convert # avoid circular import
+ if self.use_bqstorage_api:
+ return convert.to_dataframe(
+ root | beam.io.ReadFromBigQuery(
+ table=self.table, method='DIRECT_READ', output_type='BEAM_ROW'))
+ return convert.to_dataframe(
+ root
+ | beam.io.ReadFromBigQuery(table=self.table, output_type='BEAM_ROW'))
Review Comment:
nit: you could just select the method inside the branching logic, and re-use
the rest.
```suggestion
if self.use_bqstorage_api:
method = 'DIRECT_READ'
else:
method = 'EXPORT'
return convert.to_dataframe(
root
| beam.io.ReadFromBigQuery(table=self.table, method=method,
output_type='BEAM_ROW'))
```
##########
sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py:
##########
@@ -259,6 +259,43 @@ def test_table_schema_retrieve_with_direct_read(self):
]))
+class ReadUsingReadGbqTests(BigQueryReadIntegrationTests):
Review Comment:
I'd suggest making a new `dataframe.io_it_test` for these, rather than
adding them here. Will that work?
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -756,3 +767,81 @@ def expand(self, pcoll):
| beam.Map(lambda file_result:
file_result.file_name).with_output_types(
str)
}
+
+
+class _ReadGbq(beam.PTransform):
+ """Read data from BigQuery with output type 'BEAM_ROW',
+ then convert it into a deferred dataframe.
+
+ This PTransform wraps the Python ReadFromBigQuery PTransform,
+ and sets the output_type as 'BEAM_ROW' to convert
+ into a Beam Schema. Once applied to a pipeline object,
+ it is passed into the to_dataframe() function to convert the
+ PCollection into a deferred dataframe.
+
+ This PTransform currently does not support queries.
+ Note that all Python ReadFromBigQuery args can be passed in
+ to this PTransform, in addition to the args listed below.
Review Comment:
I don't think this is right, I don't see any logic to pass the
ReadFromBigQuery args through.
##########
sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py:
##########
@@ -259,6 +259,43 @@ def test_table_schema_retrieve_with_direct_read(self):
]))
+class ReadUsingReadGbqTests(BigQueryReadIntegrationTests):
+ @pytest.mark.it_postcommit
+ def test_ReadGbq(self):
+ with beam.Pipeline(argv=self.args) as p:
+ expected_df = (
+ p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
+ method=beam.io.ReadFromBigQuery.Method.EXPORT,
+ table="apache-beam-testing:"
+ "beam_bigquery_io_test."
+ "dfsqltable_3c7d6fd5_16e0460dfd0",
+ output_type='BEAM_ROW'))
+ with beam.Pipeline(argv=self.args) as p:
+ actual_df = p | apache_beam.dataframe.io.read_gbq(
+ table="apache-beam-testing:"
+ "beam_bigquery_io_test."
+ "dfsqltable_3c7d6fd5_16e0460dfd0")
+ assert_that(expected_df, equal_to(actual_df))
Review Comment:
It's a bit odd to have two pipelines like this and verify them against each
other. I also wouldn't expect this to work with a deferred dataframe instance.
Is it actually working?
I think it would be preferable to just have an explicit expected value, as
in the other ITs. Can we do that instead?
##########
sdks/python/apache_beam/dataframe/io_test.py:
##########
@@ -410,5 +419,50 @@ def test_double_write(self):
set(self.read_all_lines(output + 'out2.csv*')))
[email protected](HttpError is None, 'GCP dependencies are not installed')
Review Comment:
What happens with read_gbq if GCP dependencies are not installed? Maybe we
should not define it in that case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]