[
https://issues.apache.org/jira/browse/BEAM-11981?focusedWorklogId=642057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-642057
]
ASF GitHub Bot logged work on BEAM-11981:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Aug/21 00:13
Start Date: 26/Aug/21 00:13
Worklog Time Spent: 10m
Work Description: ajamato commented on a change in pull request #15342:
URL: https://github.com/apache/beam/pull/15342#discussion_r696193292
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
.addAllMutations(record.getValue())
.build();
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.MutateRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
result.complete(mutateRowResponse);
+ serviceCallMetric.call("ok");
}
@Override
public void onFailure(Throwable throwable) {
+ if (throwable instanceof ApiException) {
Review comment:
If you can verify that its working when you exercise the error case in
tests, then sounds good to me.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -231,17 +266,46 @@ public void close() throws IOException {
.addAllMutations(record.getValue())
.build();
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.MutateRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ tableName.getTableId()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
result.complete(mutateRowResponse);
+ serviceCallMetric.call("ok");
}
@Override
public void onFailure(Throwable throwable) {
+ if (throwable instanceof ApiException) {
+ serviceCallMetric.call(
+ ((ApiException)
throwable).getStatusCode().getCode().getHttpStatusCode());
+ } else {
+ serviceCallMetric.call(2); // Unknown
Review comment:
Please use this, rather than the number 2.
serviceCallMetric.call("unknown")
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
##########
@@ -140,4 +155,25 @@ public void testWrite() throws IOException,
InterruptedException {
underTest.close();
verify(mockBulkMutation, times(1)).flush();
}
+
+ private void verifyMetricWasSet(String method, String status, long count) {
+ // Verify the metric as reported.
+ HashMap<String, String> labels = new HashMap<>();
+ labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, method);
+ labels.put(MonitoringInfoConstants.Labels.RESOURCE, "");
+ labels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, PROJECT_ID);
+ labels.put(MonitoringInfoConstants.Labels.INSTANCE_ID, INSTANCE_ID);
+ labels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(PROJECT_ID, INSTANCE_ID,
TABLE_ID));
+ labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+ MonitoringInfoMetricName name =
+
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
labels);
+ MetricsContainerImpl container =
+ (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+ assertEquals(count, (long) container.getCounter(name).getCumulative());
Review comment:
Let's disregard this one. It just exercises one more layer, but I think
we can forgo this on second thought to keep it simpler.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -130,12 +135,40 @@ public boolean start() throws IOException {
String tableNameSr =
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ HashMap<String, String> baseLabels = new HashMap<>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigtableResource(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.TABLE_ID,
+ GcpResourceIdentifiers.bigtableTableID(
+ session.getOptions().getProjectId(),
+ session.getOptions().getInstanceId(),
+ source.getTableId().get()));
+ ServiceCallMetric serviceCallMetric =
+ new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
if (source.getRowFilter() != null) {
requestB.setFilter(source.getRowFilter());
}
- results = session.getDataClient().readRows(requestB.build());
+ try {
Review comment:
If you can verify that its working when you exercise the error case in
tests, then sounds good to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 642057)
Time Spent: 5h 50m (was: 5h 40m)
> Java Bigtable - Implement IO Request Count metrics
> --------------------------------------------------
>
> Key: BEAM-11981
> URL: https://issues.apache.org/jira/browse/BEAM-11981
> Project: Beam
> Issue Type: Test
> Components: io-java-gcp
> Reporter: Alex Amato
> Priority: P3
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> Reference PRs (See BigQuery IO example) and detailed explanation of what's
> needed to instrument this IO with Request Count metrics is found in this
> handoff doc:
> [https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'|https://docs.google.com/document/d/1lrz2wE5Dl4zlUfPAenjXIQyleZvqevqoxhyE85aj4sc/edit'?authuser=0]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)