This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 32e7a6ab750 Update BigQuerySinkMetrics for StreamingInserts. (#30320)
32e7a6ab750 is described below
commit 32e7a6ab750a11c589f92393def98d7af9555051
Author: JayajP <[email protected]>
AuthorDate: Thu Mar 21 14:35:57 2024 -0700
Update BigQuerySinkMetrics for StreamingInserts. (#30320)
* Update BigQuerySinkMetrics for StreamingInserts.
* Spotless/add additional comments
* Address comments
* Address comments and some minor optimizations
Revert chagnes to histogram bucket widths
* Add functionality to completely disable new streaming inserts metrics
* Remove unnecessary qualifiers in StreamingInsertsMetrics
* fix unit tests
---
.../dataflow/worker/StreamingDataflowWorker.java | 21 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 30 ++-
.../sdk/io/gcp/bigquery/BigQuerySinkMetrics.java | 30 ++-
.../io/gcp/bigquery/StreamingInsertsMetrics.java | 234 +++++++++++++++++++++
.../io/gcp/bigquery/BigQueryServicesImplTest.java | 163 ++++++++++++++
.../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 34 +++
.../gcp/bigquery/StreamingInsertsMetricsTest.java | 179 ++++++++++++++++
7 files changed, 674 insertions(+), 17 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 4c3ffd08a0b..c3e820767cd 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -579,13 +579,10 @@ public class StreamingDataflowWorker {
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
- // When enabled, the Pipeline will record Per-Worker metrics that will be
piped to DFE.
- StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
- options.isEnableStreamingEngine()
- && DataflowRunner.hasExperiment(options,
"enable_per_worker_metrics"));
- // StreamingStepMetricsContainer automatically deletes perWorkerCounters
if they are zero-valued
- // for longer than 5 minutes.
- BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ if (options.isEnableStreamingEngine()
+ && DataflowRunner.hasExperiment(options, "enable_per_worker_metrics"))
{
+ enableBigQueryMetrics();
+ }
JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
@@ -672,6 +669,16 @@ public class StreamingDataflowWorker {
return maxMem > 0 ? maxMem : (Runtime.getRuntime().maxMemory() / 2);
}
+ private static void enableBigQueryMetrics() {
+ // When enabled, the Pipeline will record Per-Worker metrics that will be
piped to DFE.
+ StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
+ // StreamingStepMetricsContainer automatically deletes perWorkerCounters
if they are zero-valued
+ // for longer than 5 minutes.
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ // Support metrics for BigQuery's Streaming Inserts write method.
+ BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+ }
+
void addStateNameMappings(Map<String, String> nameMap) {
stateNameMap.putAll(nameMap);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index bb3b99f6fcd..2dbc02131f5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -95,6 +95,7 @@ import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.protobuf.ProtoUtils;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -937,6 +938,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
private final List<TableDataInsertAllRequest.Rows> rows;
private final AtomicLong maxThrottlingMsec;
private final Sleeper sleeper;
+ private final StreamingInsertsMetrics result;
InsertBatchofRowsCallable(
TableReference ref,
@@ -946,7 +948,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
FluentBackoff rateLimitBackoffFactory,
List<TableDataInsertAllRequest.Rows> rows,
AtomicLong maxThrottlingMsec,
- Sleeper sleeper) {
+ Sleeper sleeper,
+ StreamingInsertsMetrics result) {
this.ref = ref;
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnkownValues = ignoreUnknownValues;
@@ -955,6 +958,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
this.rows = rows;
this.maxThrottlingMsec = maxThrottlingMsec;
this.sleeper = sleeper;
+ this.result = result;
}
@Override
@@ -975,6 +979,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
long totalBackoffMillis = 0L;
while (true) {
ServiceCallMetric serviceCallMetric =
BigQueryUtils.writeCallMetric(ref);
+ Instant start = Instant.now();
try {
List<TableDataInsertAllResponse.InsertErrors> response =
insert.execute().getInsertErrors();
@@ -987,14 +992,18 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
}
}
+ result.updateSuccessfulRpcMetrics(start, Instant.now());
return response;
} catch (IOException e) {
GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
if (errorInfo == null) {
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+ result.updateFailedRpcMetrics(start, start,
BigQuerySinkMetrics.UNKNOWN);
throw e;
}
- serviceCallMetric.call(errorInfo.getReason());
+ String errorReason = errorInfo.getReason();
+ serviceCallMetric.call(errorReason);
+ result.updateFailedRpcMetrics(start, Instant.now(), errorReason);
/**
* TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be
replaced by
* ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next
release of
@@ -1031,6 +1040,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
totalBackoffMillis += nextBackOffMillis;
final long totalBackoffMillisSoFar = totalBackoffMillis;
maxThrottlingMsec.getAndUpdate(current -> Math.max(current,
totalBackoffMillisSoFar));
+ result.updateRetriedRowsWithStatus(errorReason, rows.size());
} catch (InterruptedException interrupted) {
throw new IOException("Interrupted while waiting before retrying
insertAll");
}
@@ -1067,7 +1077,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
"If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}
-
+ StreamingInsertsMetrics streamingInsertsResults =
+ BigQuerySinkMetrics.streamingInsertsMetrics();
final Set<Integer> failedIndices = new HashSet<>();
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new
ArrayList<>();
@@ -1124,6 +1135,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
+ " pipeline, and the row will be output as a failed
insert.",
nextRowSize));
} else {
+ streamingInsertsResults.incrementFailedRows();
errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(rowIndex));
failedIndices.add(rowIndex);
rowIndex++;
@@ -1150,7 +1162,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
- sleeper)));
+ sleeper,
+ streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
strideIndex = rowIndex;
@@ -1180,7 +1193,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
- sleeper)));
+ sleeper,
+ streamingInsertsResults)));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
rows = new ArrayList<>();
@@ -1209,6 +1223,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
+ streamingInsertsResults.incrementFailedRows();
errorContainer.add(failedInserts, error, ref,
rowsToPublish.get(errorIndex));
}
}
@@ -1219,6 +1234,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " +
rowsToPublish);
} catch (ExecutionException e) {
+ streamingInsertsResults.updateStreamingInsertsMetrics(ref);
throw new RuntimeException(e.getCause());
}
@@ -1237,6 +1253,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
+ streamingInsertsResults.updateRetriedRowsWithStatus(
+ BigQuerySinkMetrics.INTERNAL, retryRows.size());
// print first 5 failures
int numErrorToLog = Math.min(allErrors.size(), 5);
LOG.info(
@@ -1258,6 +1276,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
}
}
}
+ streamingInsertsResults.updateSuccessfulAndFailedRows(rowList.size(),
allErrors.size());
+ streamingInsertsResults.updateStreamingInsertsMetrics(ref);
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
index 0375cf9ab33..392a1d16404 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -45,12 +46,14 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
*/
public class BigQuerySinkMetrics {
private static boolean supportMetricsDeletion = false;
+ private static boolean supportStreamingInsertsMetrics = false;
public static final String METRICS_NAMESPACE = "BigQuerySink";
// Status codes
- private static final String UNKNOWN = Status.Code.UNKNOWN.toString();
+ public static final String UNKNOWN = Status.Code.UNKNOWN.toString();
public static final String OK = Status.Code.OK.toString();
+ static final String INTERNAL = "INTERNAL";
public static final String PAYLOAD_TOO_LARGE = "PayloadTooLarge";
// Base Metric names
@@ -59,8 +62,9 @@ public class BigQuerySinkMetrics {
private static final String APPEND_ROWS_ROW_STATUS = "RowsAppendedCount";
public static final String THROTTLED_TIME = "ThrottledTime";
- // StorageWriteAPI Method names
+ // BigQuery Write Method names
public enum RpcMethod {
+ STREAMING_INSERTS,
APPEND_ROWS,
FLUSH_ROWS,
FINALIZE_STREAM
@@ -167,8 +171,8 @@ public class BigQuerySinkMetrics {
* 'RpcRequests-Method:{method}RpcStatus:{status};TableId:{tableId}'
TableId label is dropped
* if 'supportsMetricsDeletion' is not enabled.
*/
- private static Counter createRPCRequestCounter(
- RpcMethod method, String rpcStatus, String tableId) {
+ @VisibleForTesting
+ static Counter createRPCRequestCounter(RpcMethod method, String rpcStatus,
String tableId) {
NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
metricLabels.put(RPC_STATUS_LABEL, rpcStatus);
metricLabels.put(RPC_METHOD, method.toString());
@@ -189,7 +193,7 @@ public class BigQuerySinkMetrics {
* @param method StorageWriteAPI method associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
- private static Histogram createRPCLatencyHistogram(RpcMethod method) {
+ static Histogram createRPCLatencyHistogram(RpcMethod method) {
NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
metricLabels.put(RPC_METHOD, method.toString());
String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels);
@@ -326,6 +330,22 @@ public class BigQuerySinkMetrics {
updateRpcLatencyMetric(c, method);
}
+ /**
+ * Returns a container to store metrics for BigQuery's {@code Streaming
Inserts} RPC. If these
+ * metrics are disabled, then we return a no-op container.
+ */
+ static StreamingInsertsMetrics streamingInsertsMetrics() {
+ if (supportStreamingInsertsMetrics) {
+ return StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+ } else {
+ return StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
+ }
+ }
+
+ public static void setSupportStreamingInsertsMetrics(boolean
supportStreamingInsertsMetrics) {
+ BigQuerySinkMetrics.supportStreamingInsertsMetrics =
supportStreamingInsertsMetrics;
+ }
+
public static void setSupportMetricsDeletion(boolean supportMetricsDeletion)
{
BigQuerySinkMetrics.supportMetricsDeletion = supportMetricsDeletion;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
new file mode 100644
index 00000000000..998c9af945b
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetrics.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Histogram;
+import org.apache.beam.sdk.values.KV;
+
+/** Stores and exports metrics for a batch of Streaming Inserts RPCs. */
+public interface StreamingInsertsMetrics {
+
+ void updateRetriedRowsWithStatus(String status, int retriedRows);
+
+ void updateFailedRpcMetrics(Instant start, Instant end, String status);
+
+ void updateSuccessfulRpcMetrics(Instant start, Instant end);
+
+ void incrementFailedRows();
+
+ void updateSuccessfulAndFailedRows(int totalRows, int failedRows);
+
+ void updateStreamingInsertsMetrics(@Nullable TableReference tableRef);
+
+ /** No-op implementation of {@code StreamingInsertsResults}. */
+ class NoOpStreamingInsertsMetrics implements StreamingInsertsMetrics {
+ private NoOpStreamingInsertsMetrics() {}
+
+ @Override
+ public void updateRetriedRowsWithStatus(String status, int retriedRows) {}
+
+ @Override
+ public void updateFailedRpcMetrics(Instant start, Instant end, String
status) {}
+
+ @Override
+ public void updateSuccessfulRpcMetrics(Instant start, Instant end) {}
+
+ @Override
+ public void incrementFailedRows() {}
+
+ @Override
+ public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {}
+
+ @Override
+ public void updateStreamingInsertsMetrics(@Nullable TableReference
tableRef) {}
+
+ private static NoOpStreamingInsertsMetrics singleton = new
NoOpStreamingInsertsMetrics();
+
+ static NoOpStreamingInsertsMetrics getInstance() {
+ return singleton;
+ }
+ }
+
+ /**
+ * Metrics of a batch of InsertAll RPCs. Member variables are thread safe;
however, this class
+ * does not have atomicity across member variables.
+ *
+ * <p>Expected usage: A number of threads record metrics in an instance of
this class with the
+ * member methods. Afterwards, a single thread should call {@code
updateStreamingInsertsMetrics}
+ * which will export all counters metrics and RPC latency distribution
metrics to the underlying
+ * {@code perWorkerMetrics} container. Afterwards, metrics should not be
written/read from this
+ * object.
+ */
+ @AutoValue
+ abstract class StreamingInsertsMetricsImpl implements
StreamingInsertsMetrics {
+ abstract ConcurrentLinkedQueue<java.time.Duration> rpcLatencies();
+
+ abstract ConcurrentLinkedQueue<String> rpcErrorStatus();
+ // Represents <Rpc Status, Number of Rows> for rows that are retried
because of a failed
+ // InsertAll RPC.
+ abstract ConcurrentLinkedQueue<KV<String, Integer>> retriedRowsByStatus();
+
+ abstract AtomicInteger successfulRpcsCount();
+
+ abstract AtomicInteger successfulRowsCount();
+
+ abstract AtomicInteger failedRowsCount();
+
+ abstract AtomicBoolean isWritable();
+
+ public static StreamingInsertsMetricsImpl create() {
+ return new AutoValue_StreamingInsertsMetrics_StreamingInsertsMetricsImpl(
+ new ConcurrentLinkedQueue<>(),
+ new ConcurrentLinkedQueue<>(),
+ new ConcurrentLinkedQueue<>(),
+ new AtomicInteger(),
+ new AtomicInteger(),
+ new AtomicInteger(),
+ new AtomicBoolean(true));
+ }
+
+ /** Update metrics for rows that were retried due to an RPC error. */
+ @Override
+ public void updateRetriedRowsWithStatus(String status, int retriedRows) {
+ if (isWritable().get()) {
+ retriedRowsByStatus().add(KV.of(status, retriedRows));
+ }
+ }
+
+ /** Record the rpc status and latency of a failed StreamingInserts RPC
call. */
+ @Override
+ public void updateFailedRpcMetrics(Instant start, Instant end, String
status) {
+ if (isWritable().get()) {
+ rpcErrorStatus().add(status);
+ rpcLatencies().add(Duration.between(start, end));
+ }
+ }
+
+ /** Record the rpc status and latency of a successful StreamingInserts RPC
call. */
+ @Override
+ public void updateSuccessfulRpcMetrics(Instant start, Instant end) {
+ if (isWritable().get()) {
+ successfulRpcsCount().getAndIncrement();
+ rpcLatencies().add(Duration.between(start, end));
+ }
+ }
+
+ /** Increment the failed rows count by one. */
+ @Override
+ public void incrementFailedRows() {
+ if (isWritable().get()) {
+ failedRowsCount().getAndIncrement();
+ }
+ }
+
+ /** Increment the failed rows count, and set the successful rows count. */
+ @Override
+ public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
+ if (isWritable().get()) {
+ failedRowsCount().getAndAdd(failedRows);
+ successfulRowsCount().set(totalRows - failedRowsCount().get());
+ }
+ }
+
+ /**
+ * Export all metrics recorded in this instance to the underlying {@code
perWorkerMetrics}
+ * containers. This function will only report metrics once per instance.
Subsequent calls to
+ * this function will no-op.
+ *
+ * @param tableRef BigQuery table that was written to, return early if
null.
+ */
+ @Override
+ public void updateStreamingInsertsMetrics(@Nullable TableReference
tableRef) {
+ if (!isWritable().compareAndSet(true, false)) {
+ // Metrics have already been exported.
+ return;
+ }
+
+ if (tableRef == null) {
+ return;
+ }
+
+ String shortTableId =
+ String.join("/", "datasets", tableRef.getDatasetId(), "tables",
tableRef.getTableId());
+
+ Map<String, Integer> rpcRequetsRpcStatusMap = new HashMap<>();
+ Map<String, Integer> retriedRowsRpcStatusMap = new HashMap<>();
+
+ for (String status : rpcErrorStatus()) {
+ Integer currentVal = rpcRequetsRpcStatusMap.getOrDefault(status, 0);
+ rpcRequetsRpcStatusMap.put(status, currentVal + 1);
+ }
+
+ for (KV<String, Integer> retryCountByStatus : retriedRowsByStatus()) {
+ Integer currentVal =
retriedRowsRpcStatusMap.getOrDefault(retryCountByStatus.getKey(), 0);
+ retriedRowsRpcStatusMap.put(
+ retryCountByStatus.getKey(), currentVal +
retryCountByStatus.getValue());
+ }
+
+ for (Entry<String, Integer> entry : rpcRequetsRpcStatusMap.entrySet()) {
+ BigQuerySinkMetrics.createRPCRequestCounter(
+ BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS,
entry.getKey(), shortTableId)
+ .inc(entry.getValue());
+ }
+
+ for (Entry<String, Integer> entry : retriedRowsRpcStatusMap.entrySet()) {
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.RETRIED, entry.getKey(),
shortTableId)
+ .inc(entry.getValue());
+ }
+
+ if (successfulRpcsCount().get() != 0) {
+ BigQuerySinkMetrics.createRPCRequestCounter(
+ BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS,
+ BigQuerySinkMetrics.OK,
+ shortTableId)
+ .inc(successfulRpcsCount().longValue());
+ }
+
+ if (failedRowsCount().get() != 0) {
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
BigQuerySinkMetrics.INTERNAL, shortTableId)
+ .inc(failedRowsCount().longValue());
+ }
+
+ if (successfulRowsCount().get() != 0) {
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.SUCCESSFUL,
BigQuerySinkMetrics.OK, shortTableId)
+ .inc(successfulRowsCount().longValue());
+ }
+
+ Histogram latencyHistogram =
+ BigQuerySinkMetrics.createRPCLatencyHistogram(
+ BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
+ for (Duration latency : rpcLatencies()) {
+ latencyHistogram.update(latency.toMillis());
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index a8e1ad52237..eb4cff1a596 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
@@ -97,6 +98,7 @@ import
org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics.RowStatus;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -832,6 +834,167 @@ public class BigQueryServicesImplTest {
verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1);
}
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} sets perWorkerMetrics
properly when the Backoff
+ * limit is hit.
+ */
+ @Test
+ public void testInsertAll_InternalBigQueryErrors() throws Exception {
+ BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+ BigQuerySinkMetricsTest.TestMetricsContainer testMetricsContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testMetricsContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("row", "a")), wrapValue(new
TableRow().set("row", "b")));
+ List<String> insertIds = ImmutableList.of("a", "b");
+
+ final TableDataInsertAllResponse aFailed =
+ new TableDataInsertAllResponse()
+ .setInsertErrors(
+ ImmutableList.of(
+ new
InsertErrors().setIndex(0L).setErrors(ImmutableList.of(new ErrorProto()))));
+ MockSetupFunction aFailedResponse =
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(aFailed));
+ };
+
+ // Expect 4 InsertAll RPCS:
+ // 1st request will return an RPC error that should be retried
+ // 2nd-4th RPCs will fail to insert Row A.
+ // Expect client throw an exception after 4th RPC due to hitting backoff
limits.
+ setupMockResponses(
+ response -> {
+ when(response.getStatusCode()).thenReturn(403);
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getContent())
+
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)));
+ },
+ aFailedResponse,
+ aFailedResponse,
+ aFailedResponse,
+ aFailedResponse);
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ try {
+ dataService.insertAll(
+ ref,
+ rows,
+ insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false,
+ false,
+ null);
+ fail();
+ } catch (Exception e) {
+ }
+ verifyAllResponsesAreRead();
+
+ String tableId = "datasets/dataset/tables/table";
+ BigQuerySinkMetrics.RpcMethod method =
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+ MetricName okRpcRequestName =
+ BigQuerySinkMetrics.createRPCRequestCounter(method, "OK",
tableId).getName();
+ MetricName rateLimitRpcRequestName =
+ BigQuerySinkMetrics.createRPCRequestCounter(method,
"rateLimitExceeded", tableId).getName();
+
+ MetricName okRowsAppendedName =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.SUCCESSFUL,
"OK", tableId)
+ .getName();
+ MetricName internalRowsAppendedName =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.RETRIED,
"INTERNAL", tableId)
+ .getName();
+ MetricName rateLimitRowsAppendedName =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ RowStatus.RETRIED, "rateLimitExceeded", tableId)
+ .getName();
+ MetricName failedRowsName =
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(RowStatus.FAILED,
"INTERNAL", tableId)
+ .getName();
+
+ testMetricsContainer.assertPerWorkerCounterValue(okRpcRequestName, 4L);
+ testMetricsContainer.assertPerWorkerCounterValue(rateLimitRpcRequestName,
1L);
+ testMetricsContainer.assertPerWorkerCounterValue(okRowsAppendedName, 1L);
+ testMetricsContainer.assertPerWorkerCounterValue(internalRowsAppendedName,
3L);
+
testMetricsContainer.assertPerWorkerCounterValue(rateLimitRowsAppendedName, 2L);
+ testMetricsContainer.assertPerWorkerCounterValue(failedRowsName, 1L);
+
+ assertThat(testMetricsContainer.perWorkerHistograms.size(), equalTo(1));
+ }
+
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} sets perWorkerMetrics
properly when BigQuery
+ * returns unretryable Rpc Error.
+ */
+ @Test
+ public void testInsertAll_RpcErrors() throws Exception {
+ BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+ BigQuerySinkMetricsTest.TestMetricsContainer testMetricsContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testMetricsContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+ TableReference ref =
+ new
TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("row", "a")), wrapValue(new
TableRow().set("row", "b")));
+ List<String> insertIds = ImmutableList.of("a", "b");
+
+ // Expect 1 RPC request
+ // 1st request will return an RPC error that is not retryable.
+ // Expect client to record metrics and rethrow the error.
+ setupMockResponses(
+ response -> {
+ when(response.getStatusCode()).thenReturn(403);
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getContent())
+
.thenReturn(toStream(errorWithReasonAndStatus("actuallyForbidden", 403)));
+ });
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ try {
+ dataService.insertAll(
+ ref,
+ rows,
+ insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false,
+ false,
+ null);
+ fail();
+ } catch (Exception e) {
+ }
+ verifyAllResponsesAreRead();
+
+ String tableId = "datasets/dataset/tables/table";
+ BigQuerySinkMetrics.RpcMethod method =
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+ MetricName forbiddenRpcName =
+ BigQuerySinkMetrics.createRPCRequestCounter(method,
"actuallyForbidden", tableId).getName();
+
+ testMetricsContainer.assertPerWorkerCounterValue(forbiddenRpcName, 1L);
+ assertThat(testMetricsContainer.perWorkerHistograms.size(), equalTo(1));
+ }
+
// A BackOff that makes a total of 4 attempts
private static final FluentBackoff TEST_BACKOFF =
FluentBackoff.DEFAULT
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
index 8b960a8c6c6..861378a41f8 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.sameInstance;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import io.grpc.Status;
import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -96,6 +99,21 @@ public class BigQuerySinkMetricsTest {
perWorkerHistograms.clear();
perWorkerCounters.clear();
}
+
+ public void assertPerWorkerCounterValue(MetricName name, long value)
throws Exception {
+ assertThat(perWorkerCounters, IsMapContaining.hasKey(name));
+ assertThat(perWorkerCounters.get(name).getCumulative(), equalTo(value));
+ }
+
+ public void assertPerWorkerHistogramValues(
+ MetricName name, HistogramData.BucketType bucketType, double...
values) {
+ KV<MetricName, HistogramData.BucketType> kv = KV.of(name, bucketType);
+ assertThat(perWorkerHistograms, IsMapContaining.hasKey(kv));
+
+ Double[] objValues =
Arrays.stream(values).boxed().toArray(Double[]::new);
+
+ assertThat(perWorkerHistograms.get(kv).values,
containsInAnyOrder(objValues));
+ }
}
@Test
@@ -359,4 +377,20 @@ public class BigQuerySinkMetricsTest {
public void testParseMetricName_emptyString() {
assertThat(BigQuerySinkMetrics.parseMetricName("").isPresent(),
equalTo(false));
}
+
+ @Test
+ public void testStreamingInsertsMetrics_disabled() {
+ BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(false);
+ assertThat(
+ BigQuerySinkMetrics.streamingInsertsMetrics(),
+
sameInstance(StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance()));
+ }
+
+ @Test
+ public void testStreamingInsertsMetrics_enabled() {
+ BigQuerySinkMetrics.setSupportStreamingInsertsMetrics(true);
+ assertThat(
+ BigQuerySinkMetrics.streamingInsertsMetrics(),
+ instanceOf(StreamingInsertsMetrics.StreamingInsertsMetricsImpl.class));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
new file mode 100644
index 00000000000..e04870c531f
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInsertsMetricsTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import com.google.api.services.bigquery.model.TableReference;
+import java.time.Duration;
+import java.time.Instant;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics.RowStatus;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.util.HistogramData;
+import org.junit.Test;
+
+public class StreamingInsertsMetricsTest {
+
+ @Test
+ public void testNoOpStreamingInsertsMetrics() throws Exception {
+ BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+
+ StreamingInsertsMetrics results =
+ StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
+ results.updateRetriedRowsWithStatus("INTERNAL", 0);
+ Instant t1 = Instant.now();
+ results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+ TableReference ref = new
TableReference().setTableId("t").setDatasetId("d");
+
+ results.updateStreamingInsertsMetrics(ref);
+
+ assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
+ assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
+ }
+
+ @Test
+ public void testUpdateStreamingInsertsMetrics_nullInput() throws Exception {
+ BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+
+ StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+ results.updateRetriedRowsWithStatus("INTERNAL", 0);
+ Instant t1 = Instant.now();
+ results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+
+ results.updateStreamingInsertsMetrics(null);
+
+ assertThat(testContainer.perWorkerCounters.size(), equalTo(0));
+ assertThat(testContainer.perWorkerHistograms.size(), equalTo(0));
+ }
+
+ MetricName getAppendRowsCounterName(
+ BigQuerySinkMetrics.RowStatus rowStatus, String rpcStatus, String
tableId) {
+ return BigQuerySinkMetrics.appendRowsRowStatusCounter(rowStatus,
rpcStatus, tableId).getName();
+ }
+
+ MetricName getRpcRequestsCounterName(
+ BigQuerySinkMetrics.RpcMethod method, String rpcStatus, String tableId) {
+ return BigQuerySinkMetrics.createRPCRequestCounter(method, rpcStatus,
tableId).getName();
+ }
+
+ @Test
+ public void testUpdateStreamingInsertsMetrics_rowsAppendedCounter() throws
Exception {
+ BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ TableReference ref = new
TableReference().setTableId("t").setDatasetId("d");
+
+ StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+ results.updateRetriedRowsWithStatus("INTERNAL", 10);
+ results.updateSuccessfulAndFailedRows(50, 30);
+ results.updateRetriedRowsWithStatus("QuotaLimits", 10);
+ results.updateRetriedRowsWithStatus("QuotaLimits", 5);
+ results.updateRetriedRowsWithStatus("ServiceUnavailable", 5);
+
+ results.updateStreamingInsertsMetrics(ref);
+
+ String tableId = "datasets/d/tables/t";
+ MetricName internalErrorRetriedMetricName =
+ getAppendRowsCounterName(RowStatus.RETRIED, "INTERNAL", tableId);
+ MetricName succssfulRowsMetricName =
+ getAppendRowsCounterName(RowStatus.SUCCESSFUL, "OK", tableId);
+ MetricName failedRowsMetricName =
+ getAppendRowsCounterName(RowStatus.FAILED, "INTERNAL", tableId);
+ MetricName retriedRowsQuotaMetricName =
+ getAppendRowsCounterName(RowStatus.RETRIED, "QuotaLimits", tableId);
+ MetricName retriedRowsUnavailableMetricName =
+ getAppendRowsCounterName(RowStatus.RETRIED, "ServiceUnavailable",
tableId);
+
+ testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName,
10L);
+ testContainer.assertPerWorkerCounterValue(succssfulRowsMetricName, 20L);
+ testContainer.assertPerWorkerCounterValue(failedRowsMetricName, 30L);
+ testContainer.assertPerWorkerCounterValue(retriedRowsQuotaMetricName, 15L);
+
testContainer.assertPerWorkerCounterValue(retriedRowsUnavailableMetricName, 5L);
+ }
+
+ @Test
+ public void testUpdateStreamingInsertsMetrics_rpcLatencyHistogram() throws
Exception {
+ BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ TableReference ref = new
TableReference().setTableId("t").setDatasetId("d");
+
+ StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+ Instant t1 = Instant.now();
+ results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
+ results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(20)));
+ results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(30)),
"PermissionDenied");
+ results.updateFailedRpcMetrics(t1, t1.plus(Duration.ofMillis(40)),
"Unavailable");
+
+ results.updateStreamingInsertsMetrics(ref);
+
+ // Validate RPC latency metric.
+ MetricName histogramName =
+ MetricName.named("BigQuerySink",
"RpcLatency*rpc_method:STREAMING_INSERTS;");
+ HistogramData.BucketType bucketType =
HistogramData.ExponentialBuckets.of(1, 34);
+ testContainer.assertPerWorkerHistogramValues(histogramName, bucketType,
10.0, 20.0, 30.0, 40.0);
+
+ // Validate RPC Status metric.
+ BigQuerySinkMetrics.RpcMethod m =
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS;
+ String tableId = "datasets/d/tables/t";
+ MetricName okMetricName = getRpcRequestsCounterName(m, "OK", tableId);
+ MetricName permissionDeniedMetricName =
+ getRpcRequestsCounterName(m, "PermissionDenied", tableId);
+ MetricName unavailableMetricName = getRpcRequestsCounterName(m,
"Unavailable", tableId);
+
+ testContainer.assertPerWorkerCounterValue(okMetricName, 2L);
+ testContainer.assertPerWorkerCounterValue(permissionDeniedMetricName, 1L);
+ testContainer.assertPerWorkerCounterValue(unavailableMetricName, 1L);
+ }
+
+ @Test
+ public void
testUpdateStreamingInsertsMetrics_multipleUpdateStreamingInsertsMetrics()
+ throws Exception {
+ BigQuerySinkMetricsTest.TestMetricsContainer testContainer =
+ new BigQuerySinkMetricsTest.TestMetricsContainer();
+ MetricsEnvironment.setCurrentContainer(testContainer);
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
+ TableReference ref = new
TableReference().setTableId("t").setDatasetId("d");
+
+ StreamingInsertsMetrics results =
StreamingInsertsMetrics.StreamingInsertsMetricsImpl.create();
+ results.updateRetriedRowsWithStatus("INTERNAL", 10);
+
+ results.updateStreamingInsertsMetrics(ref);
+
+ String tableId = "datasets/d/tables/t";
+ MetricName internalErrorRetriedMetricName =
+ getAppendRowsCounterName(RowStatus.RETRIED, "INTERNAL", tableId);
+
+ testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName,
10L);
+
+ // Subsequent updates to this object should update the underyling metrics.
+ results.updateRetriedRowsWithStatus("INTERNAL", 10);
+ results.updateStreamingInsertsMetrics(ref);
+
+ testContainer.assertPerWorkerCounterValue(internalErrorRetriedMetricName,
10L);
+ }
+}