ajamato commented on a change in pull request #15294:
URL: https://github.com/apache/beam/pull/15294#discussion_r688703484
##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -368,6 +376,15 @@ def process(self, element, spanner_transaction):
for row in transaction_read(**element.kwargs):
yield row
+ table_id = self._spanner_configuration.table
+ query_name = self._spanner_configuration.query_name
+ if element.is_sql:
+ self._query_metric(query_name)
+ elif element.is_table:
+ self._table_metric(table_id)
+ else:
+ pass
Review comment:
rm else block which does nothing
##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -1068,6 +1139,28 @@ def __init__(self, spanner_configuration):
self._spanner_configuration = spanner_configuration
self._db_instance = None
self.batches = Metrics.counter(self.__class__, 'SpannerBatches')
+ self.base_labels = {
+ monitoring_infos.SERVICE_LABEL: 'Spanner',
+ monitoring_infos.METHOD_LABEL: 'Write',
+ monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project,
+ monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database,
+ }
+
+ def table_write_service_call_metric(self, table_id):
+ database_id = self._spanner_configuration.database
+ project_id = self._spanner_configuration.project
+ resource = resource_identifiers.SpannerTable(
+ project_id, database_id, table_id)
+ labels = {
+ **self.base_labels,
+ monitoring_infos.RESOURCE_LABEL: resource,
+ monitoring_infos.SPANNER_TABLE_ID: table_id
+ }
+
+ service_call_metric = ServiceCallMetric(
+ request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+ base_labels=labels)
+ service_call_metric.call('ok')
Review comment:
This only records ok status.
Please collect the errors from the API and record those status as well. Find
the grpc status code string or http status code and pass that in.
Please also run an integration test and tweak it to invoke the error and
ensure that is working.
Here is an example.
https://github.com/apache/beam/blob/c5b6a9a1289d4be12011fbfce1796136918db8a9/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L658
But you may need to do something different to properly extract error codes
after the API call is made.
Also it would be more consistent with the code base to user a helper
function in the class itself which, plumbing necessary metadata through. Rather
than passing the function reference around.
##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -284,6 +288,8 @@ class
_BeamSpannerConfiguration(namedtuple("_BeamSpannerConfiguration",
["project",
"instance",
"database",
+ "table",
Review comment:
use table_id as the name here to be consistent with other "table_id"
variable names
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]