ajamato commented on a change in pull request #15342:
URL: https://github.com/apache/beam/pull/15342#discussion_r696193292



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
               .addAllMutations(record.getValue())
               .build();
 
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"google.bigtable.v2.MutateRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, 
session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, 
session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      ServiceCallMetric serviceCallMetric =
+          new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
       CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
       Futures.addCallback(
           new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
           new FutureCallback<MutateRowResponse>() {
             @Override
             public void onSuccess(MutateRowResponse mutateRowResponse) {
               result.complete(mutateRowResponse);
+              serviceCallMetric.call("ok");
             }
 
             @Override
             public void onFailure(Throwable throwable) {
+              if (throwable instanceof ApiException) {

Review comment:
       If you can verify that its working when you exercise the error case in 
tests, then sounds good to me.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
               .addAllMutations(record.getValue())
               .build();
 
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"google.bigtable.v2.MutateRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, 
session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, 
session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              tableName.getTableId()));
+      ServiceCallMetric serviceCallMetric =
+          new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
       CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
       Futures.addCallback(
           new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
           new FutureCallback<MutateRowResponse>() {
             @Override
             public void onSuccess(MutateRowResponse mutateRowResponse) {
               result.complete(mutateRowResponse);
+              serviceCallMetric.call("ok");
             }
 
             @Override
             public void onFailure(Throwable throwable) {
+              if (throwable instanceof ApiException) {
+                serviceCallMetric.call(
+                    ((ApiException) 
throwable).getStatusCode().getCode().getHttpStatusCode());
+              } else {
+                serviceCallMetric.call(2); // Unknown

Review comment:
       Please use this, rather than the number 2.
   serviceCallMetric.call("unknown")

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
##########
@@ -140,4 +155,25 @@ public void testWrite() throws IOException, 
InterruptedException {
     underTest.close();
     verify(mockBulkMutation, times(1)).flush();
   }
+
+  private void verifyMetricWasSet(String method, String status, long count) {
+    // Verify the metric as reported.
+    HashMap<String, String> labels = new HashMap<>();
+    labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+    labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, method);
+    labels.put(MonitoringInfoConstants.Labels.RESOURCE, "");
+    labels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, PROJECT_ID);
+    labels.put(MonitoringInfoConstants.Labels.INSTANCE_ID, INSTANCE_ID);
+    labels.put(
+        MonitoringInfoConstants.Labels.TABLE_ID,
+        GcpResourceIdentifiers.bigtableTableID(PROJECT_ID, INSTANCE_ID, 
TABLE_ID));
+    labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+    MonitoringInfoMetricName name =
+        
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
labels);
+    MetricsContainerImpl container =
+        (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+    assertEquals(count, (long) container.getCounter(name).getCumulative());

Review comment:
       Let's disregard this one. It just exercises one more layer, but I think 
we can forgo this on second thought to keep it simpler.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -130,12 +135,40 @@ public boolean start() throws IOException {
       String tableNameSr =
           
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
 
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, 
session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, 
session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      ServiceCallMetric serviceCallMetric =
+          new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
       ReadRowsRequest.Builder requestB =
           
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
       if (source.getRowFilter() != null) {
         requestB.setFilter(source.getRowFilter());
       }
-      results = session.getDataClient().readRows(requestB.build());
+      try {

Review comment:
       If you can verify that its working when you exercise the error case in 
tests, then sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to