ajamato commented on a change in pull request #15342:
URL: https://github.com/apache/beam/pull/15342#discussion_r696193292
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
.addAllMutations(record.getValue())
.build();
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.MutateRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
result.complete(mutateRowResponse);
+ serviceCallMetric.call("ok");
}
@Override
public void onFailure(Throwable throwable) {
+ if (throwable instanceof ApiException) {
Review comment:
If you can verify that its working when you exercise the error case in
tests, then sounds good to me.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
.addAllMutations(record.getValue())
.build();
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.MutateRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
result.complete(mutateRowResponse);
+ serviceCallMetric.call("ok");
}
@Override
public void onFailure(Throwable throwable) {
+ if (throwable instanceof ApiException) {
+ serviceCallMetric.call(
+ ((ApiException)
throwable).getStatusCode().getCode().getHttpStatusCode());
+ } else {
+ serviceCallMetric.call(2); // Unknown
Review comment:
Please use this, rather than the number 2.
serviceCallMetric.call("unknown")
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
##########
@@ -140,4 +155,25 @@ public void testWrite() throws IOException,
InterruptedException {
underTest.close();
verify(mockBulkMutation, times(1)).flush();
}
+
+ private void verifyMetricWasSet(String method, String status, long count) {
+ // Verify the metric as reported.
+ HashMap<String, String> labels = new HashMap<>();
+ labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, method);
+ labels.put(MonitoringInfoConstants.Labels.RESOURCE, "");
+ labels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, PROJECT_ID);
+ labels.put(MonitoringInfoConstants.Labels.INSTANCE_ID, INSTANCE_ID);
+ labels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(PROJECT_ID, INSTANCE_ID,
TABLE_ID));
+ labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+ MonitoringInfoMetricName name =
+
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
labels);
+ MetricsContainerImpl container =
+ (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+ assertEquals(count, (long) container.getCounter(name).getCumulative());
Review comment:
Let's disregard this one. It just exercises one more layer, but I think
we can forgo this on second thought to keep it simpler.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -130,12 +135,40 @@ public boolean start() throws IOException {
String tableNameSr =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
if (source.getRowFilter() != null) {
requestB.setFilter(source.getRowFilter());
}
- results = session.getDataClient().readRows(requestB.build());
+ try {
Review comment:
If you can verify that its working when you exercise the error case in
tests, then sounds good to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]