ajamato commented on a change in pull request #15294:
URL: https://github.com/apache/beam/pull/15294#discussion_r688703484



##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -368,6 +376,15 @@ def process(self, element, spanner_transaction):
       for row in transaction_read(**element.kwargs):
         yield row
 
+    table_id = self._spanner_configuration.table
+    query_name = self._spanner_configuration.query_name
+    if element.is_sql:
+      self._query_metric(query_name)
+    elif element.is_table:
+      self._table_metric(table_id)
+    else:
+      pass

Review comment:
       rm else block which does nothing

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -1068,6 +1139,28 @@ def __init__(self, spanner_configuration):
     self._spanner_configuration = spanner_configuration
     self._db_instance = None
     self.batches = Metrics.counter(self.__class__, 'SpannerBatches')
+    self.base_labels = {
+        monitoring_infos.SERVICE_LABEL: 'Spanner',
+        monitoring_infos.METHOD_LABEL: 'Write',
+        monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project,
+        monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database,
+    }
+
+  def table_write_service_call_metric(self, table_id):
+    database_id = self._spanner_configuration.database
+    project_id = self._spanner_configuration.project
+    resource = resource_identifiers.SpannerTable(
+        project_id, database_id, table_id)
+    labels = {
+        **self.base_labels,
+        monitoring_infos.RESOURCE_LABEL: resource,
+        monitoring_infos.SPANNER_TABLE_ID: table_id
+    }
+
+    service_call_metric = ServiceCallMetric(
+        request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+        base_labels=labels)
+    service_call_metric.call('ok')

Review comment:
       This only records ok status.
   Please collect the errors from the API and record those status as well. Find 
the grpc status code string or http status code and pass that in.
   
   Please also run an integration test and tweak it to invoke the error and 
ensure that is working.
   Here is an example.
   
https://github.com/apache/beam/blob/c5b6a9a1289d4be12011fbfce1796136918db8a9/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L658
   
   But you may need to do something different to properly extract error codes 
after the API call is made.
   
   Also it would be more consistent with the code base to user a helper 
function in the class itself which, plumbing necessary metadata through. Rather 
than passing the function reference around.
   
   

##########
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##########
@@ -284,6 +288,8 @@ class 
_BeamSpannerConfiguration(namedtuple("_BeamSpannerConfiguration",
                                            ["project",
                                             "instance",
                                             "database",
+                                            "table",

Review comment:
       use table_id as the name here to be consistent with other "table_id" 
variable names




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to