ajamato commented on a change in pull request #15328:
URL: https://github.com/apache/beam/pull/15328#discussion_r688690771
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
##########
@@ -395,17 +416,34 @@ def write_mutations(self, throttler, rpc_stats_callback,
throttle_delay=1):
for element in self._batch_elements:
self.add_to_batch(element)
+ # Create request count metric
+ resource = resource_identifiers.DatastoreNamespace(self._project, None)
+ labels = {
+ monitoring_infos.SERVICE_LABEL: 'Datastore',
+ monitoring_infos.METHOD_LABEL: 'BatchDatastoreWrite',
+ monitoring_infos.RESOURCE_LABEL: resource,
+ monitoring_infos.DATASTORE_NAMESPACE_LABEL: None,
+ monitoring_infos.DATASTORE_PROJECT_ID_LABEL: self._project,
+ monitoring_infos.STATUS_LABEL: 'ok'
+ }
+
+ service_call_metric = ServiceCallMetric(
+ request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
+ base_labels=labels)
+
try:
start_time = time.time()
self._batch.commit()
end_time = time.time()
+ service_call_metric.call('ok')
rpc_stats_callback(successes=1)
throttler.successful_request(start_time * 1000)
commit_time_ms = int((end_time - start_time) * 1000)
return commit_time_ms
- except Exception:
+ except Exception as e:
self._batch = None
+ service_call_metric.call(e.code)
Review comment:
This won't work. The base Exception class does not have a .code
attribute.
Please catch the exception properly, using the specific exception type or
otherwise determine how to get the grpc/http error code out of this client API
and pass it in here.
Please ask on the beam dev list for a way to run an integration test and
make sure this works through testing.
Log that you can pull out the exception
Make sure it doesn't raise another exception(i.e. attribute error) when this
exception branch is ran
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py
##########
@@ -338,6 +346,26 @@ def test_DatastoreWriteLargeEntities(self):
self.assertEqual(2, commit_count[0])
+ def test_DatastoreWrite_monitoring_info(self):
+ num_entities_to_write = 1
+ self.check_DatastoreWriteFn(num_entities_to_write)
+ resource = resource_identifiers.DatastoreNamespace(self._PROJECT, None)
+ labels = {
+ monitoring_infos.SERVICE_LABEL: 'Datastore',
+ monitoring_infos.METHOD_LABEL: 'BatchDatastoreWrite',
+ monitoring_infos.RESOURCE_LABEL: resource,
+ monitoring_infos.DATASTORE_NAMESPACE_LABEL: None,
+ monitoring_infos.DATASTORE_PROJECT_ID_LABEL: self._PROJECT,
+ monitoring_infos.STATUS_LABEL: 'ok'
+ }
+
+ metric_name = MetricName(
+ None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
+ metric_value = MetricsEnvironment.process_wide_container().get_counter(
Review comment:
Would you mind testing it like this:
https://github.com/apache/beam/blob/39cf3fccbb3950b9fd0c8938301d192c3ca85cce/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py#L436
Specifically, pulling out the MonitoringInfos objects with the call to
to_runner_api_monitoring_infos and testing that instead of testing with
get_counter.
The point if this is to make sure that the monitoring_infos generated and
send to the worker_harness are as we exepct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]