This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 170420b  [BEAM-11994] Update BigQueryServicesImpl to capture 
API_REQUEST_COUNT metrics for streaming inserts mode
     new 8377478  Merge pull request #14501 from ajamato/bq_metrics_instrument
170420b is described below

commit 170420b70819550b600e5e22b67fcf1d1bf6434a
Author: Alex Amato <[email protected]>
AuthorDate: Fri Apr 9 12:57:37 2021 -0700

    [BEAM-11994] Update BigQueryServicesImpl to capture API_REQUEST_COUNT 
metrics for streaming inserts mode
---
 .../core/metrics/GcpResourceIdentifiers.java       |  35 +++++++
 .../runners/core/metrics/ServiceCallMetric.java    | 107 +++++++++++++++++++++
 .../core/metrics/GcpResourceIdentifiersTest.java   |  31 ++++++
 .../core/metrics/ServiceCallMetricTest.java        |  69 +++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  85 ++++++++--------
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  |  64 +++++++++++-
 6 files changed, 347 insertions(+), 44 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
new file mode 100644
index 0000000..6ae35d2
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+/**
+ * Helper functions to generate resource labels strings for GCP entitites 
These can be used on
+ * MonitoringInfo 'resource' labels. See example entities:
+ *
+ * <p>https://s.apache.org/beam-gcp-debuggability For GCP entities, populate 
the RESOURCE label with
+ * the aip.dev/122 format: https://google.aip.dev/122 If an official GCP 
format does not exist, try
+ * to use the following format. 
//whatever.googleapis.com/parents/{parentId}/whatevers/{whateverId}
+ */
+public class GcpResourceIdentifiers {
+
+  public static String bigQueryTable(String projectId, String datasetId, 
String tableId) {
+    return String.format(
+        "//bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s",
+        projectId, datasetId, tableId);
+  }
+}
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java
new file mode 100644
index 0000000..17f55ba
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.metrics.Counter;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/*
+ * Metric class which records Service API call metrics.
+ * This class will capture a request count metric for the specified
+ * request_count_urn and base_labels.
+ * When Call() is invoked the status must be provided, which will
+ * be converted to a canonical GCP status code, if possible.
+ */
+public class ServiceCallMetric {
+
+  public static final Map<Integer, String> CANONICAL_STATUS_MAP =
+      ImmutableMap.<Integer, String>builder()
+          .put(200, "ok")
+          .put(400, "out_of_range")
+          .put(401, "unauthenticated")
+          .put(403, "permission_denied")
+          .put(404, "not_found")
+          .put(409, "already_exists")
+          .put(429, "resource_exhausted")
+          .put(499, "cancelled")
+          .put(500, "internal")
+          .put(501, "not_implemented")
+          .put(503, "unavailable")
+          .put(504, "deadline_exceeded")
+          .build();
+
+  public static final String CANONICAL_STATUS_UNKNOWN = "unknown";
+
+  public static final Map<String, String> STATUS_NORMALIZATION_MAP =
+      ImmutableMap.<String, String>builder()
+          .put("outofrange", "out_of_range")
+          .put("permissiondenied", "permission_denied")
+          .put("notfound", "not_found")
+          .put("alreadyexists", "already_exists")
+          .put("resourceexhausted", "resource_exhausted")
+          .put("notimplemented", "not_implemented")
+          .put("unavailable", "unavailable")
+          .put("deadlineexceeded", "deadline_exceeded")
+          .build();
+
+  private HashMap<String, String> labels;
+  private final String requestCountUrn;
+
+  public ServiceCallMetric(String requestCountUrn, HashMap<String, String> 
baseLabels) {
+    this.requestCountUrn = requestCountUrn;
+    this.labels = baseLabels;
+  }
+
+  public void call(int httpStatusCode) {
+    String canonicalStatusCode = 
ServiceCallMetric.convertToCanonicalStatusString(httpStatusCode);
+    call(canonicalStatusCode);
+  }
+
+  public void call(String statusCode) {
+    labels.put(
+        MonitoringInfoConstants.Labels.STATUS,
+        ServiceCallMetric.convertToCanonicalStatusString(statusCode));
+    // MonitoringInfoMetricName will copy labels. So its safe to reuse this 
reference.
+    MonitoringInfoMetricName name = 
MonitoringInfoMetricName.named(requestCountUrn, labels);
+    Counter counter = LabeledMetrics.counter(name, true);
+    counter.inc();
+  }
+
+  /** Converts an http status code to a canonical GCP status code string. */
+  public static String convertToCanonicalStatusString(int httpStatusCode) {
+    return CANONICAL_STATUS_MAP.getOrDefault(httpStatusCode, 
CANONICAL_STATUS_UNKNOWN);
+  }
+
+  /**
+   * Converts an status code string to a canonical GCP status code string. 
This is used to make
+   * strings like "notFound" to "not_found". If a mapping cannot be created 
known, then
+   * statusCode.toLowerCase() will be returned.
+   */
+  public static String convertToCanonicalStatusString(String statusCode) {
+    if (statusCode == null) {
+      return CANONICAL_STATUS_UNKNOWN;
+    }
+    String normalizedStatus = 
STATUS_NORMALIZATION_MAP.get(statusCode.toLowerCase());
+    if (normalizedStatus != null) {
+      return normalizedStatus;
+    }
+    return statusCode.toLowerCase();
+  }
+}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java
new file mode 100644
index 0000000..97afa09
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GcpResourceIdentifiersTest {
+  @Test
+  public void testBigQueryTable() {
+    String resource = GcpResourceIdentifiers.bigQueryTable("myProject", 
"myDataset", "myTableId");
+    Assert.assertEquals(
+        
"//bigquery.googleapis.com/projects/myProject/datasets/myDataset/tables/myTableId",
+        resource);
+  }
+}
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java
new file mode 100644
index 0000000..182ebd3
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.HashMap;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ServiceCallMetricTest {
+
+  @Test
+  public void testCall() {
+    // Test that its on the ProcessWideMetricContainer.
+    MetricsContainerImpl container = new MetricsContainerImpl(null);
+    MetricsEnvironment.setProcessWideContainer(container);
+
+    String urn = MonitoringInfoConstants.Urns.API_REQUEST_COUNT;
+    HashMap<String, String> labels = new HashMap<String, String>();
+    labels.put("key", "value");
+    ServiceCallMetric metric =
+        new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
labels);
+
+    // Http Status Code
+    metric.call(200);
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "ok");
+    MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, 
labels);
+    Assert.assertEquals(1, (long) container.getCounter(name).getCumulative());
+
+    // Normalize the status by lower casing and mapping to a canonical name 
with underscores.
+    metric.call("notFound");
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "not_found");
+    name = MonitoringInfoMetricName.named(urn, labels);
+    Assert.assertEquals(1, (long) container.getCounter(name).getCumulative());
+
+    // Normalize the status by lower casing and mapping to a canonical name 
with underscores.
+    metric.call("PERMISSIONDENIED");
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "permission_denied");
+    name = MonitoringInfoMetricName.named(urn, labels);
+    Assert.assertEquals(1, (long) container.getCounter(name).getCumulative());
+
+    // Accept other string codes passed in, even if they aren't in the 
canonical map.
+    metric.call("something_else");
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "something_else");
+    name = MonitoringInfoMetricName.named(urn, labels);
+    Assert.assertEquals(1, (long) container.getCounter(name).getCumulative());
+
+    // Map unknown numeric codes to "unknown"
+    metric.call(123);
+    labels.put(MonitoringInfoConstants.Labels.STATUS, "unknown");
+    name = MonitoringInfoMetricName.named(urn, labels);
+    Assert.assertEquals(1, (long) container.getCounter(name).getCumulative());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index fd32a84..5e9cdb7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -37,6 +37,7 @@ import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.Bigquery.Tables;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -97,9 +98,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.beam.runners.core.metrics.LabeledMetrics;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
@@ -131,8 +132,9 @@ import org.slf4j.LoggerFactory;
  * service.
  */
 @SuppressWarnings({"keyfor", "nullness"}) // 
TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-class BigQueryServicesImpl implements BigQueryServices {
 
+// TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+class BigQueryServicesImpl implements BigQueryServices {
   private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryServicesImpl.class);
 
   // How frequently to log while polling.
@@ -449,6 +451,7 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   @VisibleForTesting
   static class DatasetServiceImpl implements DatasetService {
+    // Backoff: 200ms * 1.5 ^ n, n=[1,5]
     private static final FluentBackoff INSERT_BACKOFF_FACTORY =
         
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
 
@@ -470,24 +473,6 @@ class BigQueryServicesImpl implements BigQueryServices {
 
     private ExecutorService executor;
 
-    protected static final Map<Integer, String> CANONICAL_STATUS_MAP =
-        ImmutableMap.<Integer, String>builder()
-            .put(200, "ok")
-            .put(400, "out_of_range")
-            .put(401, "unauthenticated")
-            .put(403, "permission_denied")
-            .put(404, "not_found")
-            .put(409, "already_exists")
-            .put(429, "resource_exhausted")
-            .put(499, "cancelled")
-            .put(500, "internal")
-            .put(501, "not_implemented")
-            .put(503, "unavailable")
-            .put(504, "deadline_exceeded")
-            .build();
-
-    protected static final String CANONICAL_STATUS_UNKNOWN = "unknown";
-
     @VisibleForTesting
     DatasetServiceImpl(
         Bigquery client, @Nullable BigQueryWriteClient newWriteClient, 
PipelineOptions options) {
@@ -853,6 +838,24 @@ class BigQueryServicesImpl implements BigQueryServices {
       if (!ignoreInsertIds) {
         idsToPublish = insertIdList;
       }
+
+      HashMap<String, String> baseLabels = new HashMap<String, String>();
+      // TODO(ajamato): Add Ptransform label. Populate it as empty for now to 
prevent the
+      // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"BigQueryBatchWrite");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigQueryTable(
+              ref.getProjectId(), ref.getDatasetId(), ref.getTableId()));
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, 
ref.getProjectId());
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, 
ref.getDatasetId());
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, 
ref.getTableId());
+
+      ServiceCallMetric serviceCallMetric =
+          new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
       while (true) {
         List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new 
ArrayList<>();
         List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : 
null;
@@ -896,6 +899,8 @@ class BigQueryServicesImpl implements BigQueryServices {
                     .insertAll(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId(), content)
                     .setPrettyPrint(false);
 
+            // Create final reference (which cannot change).
+            // So the lamba expression can refer to rowsInsertedForRequest to 
use on error.
             futures.add(
                 executor.submit(
                     () -> {
@@ -905,13 +910,25 @@ class BigQueryServicesImpl implements BigQueryServices {
                       long totalBackoffMillis = 0L;
                       while (true) {
                         try {
-                          return insert.execute().getInsertErrors();
+                          List<TableDataInsertAllResponse.InsertErrors> 
response =
+                              insert.execute().getInsertErrors();
+                          if (response == null || response.isEmpty()) {
+                            serviceCallMetric.call("ok");
+                          } else {
+                            for (TableDataInsertAllResponse.InsertErrors 
insertErrors : response) {
+                              for (ErrorProto insertError : 
insertErrors.getErrors()) {
+                                
serviceCallMetric.call(insertError.getReason());
+                              }
+                            }
+                          }
+                          return response;
                         } catch (IOException e) {
-                          recordError(e);
                           GoogleJsonError.ErrorInfo errorInfo = 
getErrorInfo(e);
                           if (errorInfo == null) {
+                            
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
                             throw e;
                           }
+                          serviceCallMetric.call(errorInfo.getReason());
                           /**
                            * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error 
will be replaced by
                            * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after 
the next release of
@@ -958,15 +975,18 @@ class BigQueryServicesImpl implements BigQueryServices {
             if (errors == null) {
               continue;
             }
+
             for (TableDataInsertAllResponse.InsertErrors error : errors) {
               if (error.getIndex() == null) {
                 throw new IOException("Insert failed: " + error + ", other 
errors: " + allErrors);
               }
-
               int errorIndex = error.getIndex().intValue() + 
strideIndices.get(i);
               if (retryPolicy.shouldRetry(new 
InsertRetryPolicy.Context(error))) {
                 allErrors.add(error);
                 retryRows.add(rowsToPublish.get(errorIndex));
+                // TODO (BEAM-12139): Select the retry rows(using errorIndex) 
from the batch of rows
+                // which attempted insertion in this call. Not the entire set 
of rows in
+                // rowsToPublish.
                 if (retryIds != null) {
                   retryIds.add(idsToPublish.get(errorIndex));
                 }
@@ -1045,23 +1065,6 @@ class BigQueryServicesImpl implements BigQueryServices {
       return errorInfo;
     }
 
-    protected void recordError(IOException e) {
-      if (e instanceof GoogleJsonResponseException) {
-        int errorCode = ((GoogleJsonResponseException) 
e).getDetails().getCode();
-        String canonicalGcpStatus =
-            CANONICAL_STATUS_MAP.getOrDefault(errorCode, 
CANONICAL_STATUS_UNKNOWN);
-        LabeledMetrics.counter(
-                MonitoringInfoMetricName.named(
-                    MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
-                    ImmutableMap.<String, String>builder()
-                        .putAll(API_METRIC_LABEL)
-                        .put(MonitoringInfoConstants.Labels.STATUS, 
canonicalGcpStatus)
-                        .build()),
-                true)
-            .inc();
-      }
-    }
-
     @Override
     public Table patchTableDescription(
         TableReference tableReference, @Nullable String tableDescription)
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 6a0c9cb..52543db 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -68,13 +68,19 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
 import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -132,6 +138,10 @@ public class BigQueryServicesImplTest {
         new Bigquery.Builder(
                 transport, Transport.getJsonFactory(), new 
RetryHttpRequestInitializer())
             .build();
+
+    // Setup the ProcessWideContainer for testing metrics are set.
+    MetricsContainerImpl container = new MetricsContainerImpl(null);
+    MetricsEnvironment.setProcessWideContainer(container);
   }
 
   @FunctionalInterface
@@ -169,6 +179,30 @@ public class BigQueryServicesImplTest {
     }
   }
 
+  private void verifyWriteMetricWasSet(
+      String projectId, String dataset, String table, String status, long 
count) {
+    // Verify the metric as reported.
+    HashMap<String, String> labels = new HashMap<String, String>();
+    // TODO(ajamato): Add Ptransform label. Populate it as empty for now to 
prevent the
+    // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+    labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+    labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+    labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+    labels.put(
+        MonitoringInfoConstants.Labels.RESOURCE,
+        GcpResourceIdentifiers.bigQueryTable(projectId, dataset, table));
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, projectId);
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, dataset);
+    labels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, table);
+    labels.put(MonitoringInfoConstants.Labels.STATUS, status);
+
+    MonitoringInfoMetricName name =
+        
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
labels);
+    MetricsContainerImpl container =
+        (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer();
+    assertEquals(count, (long) container.getCounter(name).getCumulative());
+  }
+
   /** Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} 
succeeds. */
   @Test
   public void testStartLoadJobSucceeds() throws IOException, 
InterruptedException {
@@ -625,6 +659,8 @@ public class BigQueryServicesImplTest {
 
     verifyAllResponsesAreRead();
     expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+
+    verifyWriteMetricWasSet("project", "dataset", "table", 
"ratelimitexceeded", 1);
   }
 
   /** Tests that {@link DatasetServiceImpl#insertAll} retries quota exceeded 
attempts. */
@@ -667,6 +703,8 @@ public class BigQueryServicesImplTest {
 
     verifyAllResponsesAreRead();
     expectedLogs.verifyInfo("BigQuery insertAll error, retrying:");
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1);
   }
 
   /** Tests that {@link DatasetServiceImpl#insertAll} can stop quotaExceeded 
retry attempts. */
@@ -720,6 +758,8 @@ public class BigQueryServicesImplTest {
         false);
 
     verifyAllResponsesAreRead();
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1);
   }
 
   // A BackOff that makes a total of 4 attempts
@@ -776,6 +816,9 @@ public class BigQueryServicesImplTest {
         false);
 
     verifyAllResponsesAreRead();
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "unknown", 1);
+    verifyWriteMetricWasSet("project", "dataset", "table", "ok", 1);
   }
 
   /** Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when 
persistent issues. */
@@ -786,13 +829,18 @@ public class BigQueryServicesImplTest {
     List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
         ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow()));
 
+    ErrorProto errorProto = new ErrorProto().setReason("schemaMismatch");
     final TableDataInsertAllResponse row1Failed =
         new TableDataInsertAllResponse()
-            .setInsertErrors(ImmutableList.of(new 
InsertErrors().setIndex(1L)));
+            .setInsertErrors(
+                ImmutableList.of(
+                    new 
InsertErrors().setIndex(1L).setErrors(ImmutableList.of(errorProto))));
 
     final TableDataInsertAllResponse row0Failed =
         new TableDataInsertAllResponse()
-            .setInsertErrors(ImmutableList.of(new 
InsertErrors().setIndex(0L)));
+            .setInsertErrors(
+                ImmutableList.of(
+                    new 
InsertErrors().setIndex(0L).setErrors(ImmutableList.of(errorProto))));
 
     MockSetupFunction row0FailureResponseFunction =
         response -> {
@@ -837,12 +885,14 @@ public class BigQueryServicesImplTest {
     } catch (IOException e) {
       assertThat(e, instanceOf(IOException.class));
       assertThat(e.getMessage(), containsString("Insert failed:"));
-      assertThat(e.getMessage(), containsString("[{\"index\":0}]"));
+      assertThat(e.getMessage(), 
containsString("[{\"errors\":[{\"reason\":\"schemaMismatch\"}]"));
     }
 
     // Verify the exact number of retries as well as log messages.
     verifyAllResponsesAreRead();
     expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "schemamismatch", 
4);
   }
 
   /**
@@ -856,6 +906,8 @@ public class BigQueryServicesImplTest {
     List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows = new 
ArrayList<>();
     rows.add(wrapValue(new TableRow()));
 
+    final TableDataInsertAllResponse allRowsSucceeded =
+        new TableDataInsertAllResponse().setInsertErrors(ImmutableList.of());
     // First response is 403 non-{rate-limited, quota-exceeded}, second 
response has valid payload
     // but should not be invoked.
     setupMockResponses(
@@ -898,6 +950,8 @@ public class BigQueryServicesImplTest {
       verify(responses[1], never()).getContent();
       verify(responses[1], never()).getContentType();
     }
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "actually 
forbidden", 1);
   }
 
   /**
@@ -972,6 +1026,8 @@ public class BigQueryServicesImplTest {
         false);
     assertEquals(1, failedInserts.size());
     expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "timeout", 2);
   }
 
   /**
@@ -1042,6 +1098,8 @@ public class BigQueryServicesImplTest {
     assertTrue(parsedRequest.getIgnoreUnknownValues());
     assertNull(parsedRequest.getRows().get(0).getInsertId());
     assertNull(parsedRequest.getRows().get(1).getInsertId());
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "ok", 2);
   }
 
   /** A helper to convert a string response back to a {@link GenericJson} 
subclass. */

Reply via email to