This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86db595 [BEAM-11994] Update BigQueryStorageStreamSource and
BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage
API reads
new 9673d8b Merge pull request #15117 from ajamato/bq_java_read_metrics
86db595 is described below
commit 86db595d3c19a80d7eaa9bfbd20739afae20c7c6
Author: Alex Amato <[email protected]>
AuthorDate: Mon Jun 21 14:20:17 2021 -0700
[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to
capture API_REQUEST_COUNT metrics/errors for storage API reads
---
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 11 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 80 +++++++---
.../gcp/bigquery/BigQueryStorageQuerySource.java | 5 +
.../io/gcp/bigquery/BigQueryStorageSourceBase.java | 15 +-
.../gcp/bigquery/BigQueryStorageStreamSource.java | 31 +++-
.../gcp/bigquery/BigQueryStorageTableSource.java | 32 +++-
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 102 +++++++++++++
.../gcp/bigquery/BigQueryIOStorageQueryTest.java | 2 +-
.../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 66 ++++----
.../io/gcp/bigquery/BigQueryServicesImplTest.java | 167 ++++++++++++++++++++-
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 80 ++++++++++
11 files changed, 531 insertions(+), 60 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index f7b95e7..29d5c00 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -231,14 +231,23 @@ public interface BigQueryServices extends Serializable {
/** An interface representing a client object for making calls to the
BigQuery Storage API. */
interface StorageClient extends AutoCloseable {
- /** Create a new read session against an existing table. */
+ /**
+ * Create a new read session against an existing table. This method
variant collects request
+ * count metric, table id in the request.
+ */
ReadSession createReadSession(CreateReadSessionRequest request);
/** Read rows in the context of a specific read stream. */
BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
+ /* This method variant collects request count metric, using the
fullTableID metadata. */
+ BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request,
String fullTableId);
+
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request);
+ /* This method variant collects request count metric, using the
fullTableID metadata. */
+ SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request,
String fullTableId);
+
/**
* Close the client object.
*
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index d788bfe..951f1fd 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,6 +29,7 @@ import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
@@ -100,7 +101,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -843,22 +843,7 @@ class BigQueryServicesImpl implements BigQueryServices {
idsToPublish = insertIdList;
}
- HashMap<String, String> baseLabels = new HashMap<String, String>();
- // TODO(ajamato): Add Ptransform label. Populate it as empty for now to
prevent the
- // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
- baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"BigQueryBatchWrite");
- baseLabels.put(
- MonitoringInfoConstants.Labels.RESOURCE,
- GcpResourceIdentifiers.bigQueryTable(
- ref.getProjectId(), ref.getDatasetId(), ref.getTableId()));
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID,
ref.getProjectId());
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET,
ref.getDatasetId());
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE,
ref.getTableId());
-
- ServiceCallMetric serviceCallMetric =
- new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
while (true) {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new
ArrayList<>();
@@ -1363,9 +1348,31 @@ class BigQueryServicesImpl implements BigQueryServices {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}
+ // Since BigQueryReadClient client's methods are final they cannot be
mocked with Mockito for
+ // testing
+ // So this wrapper method can be mocked in tests, instead.
+ ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+ return client.createReadSession(request);
+ }
+
@Override
public ReadSession createReadSession(CreateReadSessionRequest request) {
- return client.createReadSession(request);
+ TableReference tableReference =
+ BigQueryUtils.toTableReference(request.getReadSession().getTable());
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.readCallMetric(tableReference);
+ try {
+ ReadSession session = callCreateReadSession(request);
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ return session;
+
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
}
@Override
@@ -1374,11 +1381,48 @@ class BigQueryServicesImpl implements BigQueryServices {
}
@Override
+ public BigQueryServerStream<ReadRowsResponse> readRows(
+ ReadRowsRequest request, String fullTableId) {
+ TableReference tableReference =
BigQueryUtils.toTableReference(fullTableId);
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.readCallMetric(tableReference);
+ try {
+ BigQueryServerStream<ReadRowsResponse> response = readRows(request);
+ serviceCallMetric.call("ok");
+ return response;
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest
request) {
return client.splitReadStream(request);
}
@Override
+ public SplitReadStreamResponse splitReadStream(
+ SplitReadStreamRequest request, String fullTableId) {
+ TableReference tableReference =
BigQueryUtils.toTableReference(fullTableId);
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.readCallMetric(tableReference);
+ try {
+ SplitReadStreamResponse response = splitReadStream(request);
+
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ return response;
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public void close() {
client.close();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 712456a..120adc1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -175,4 +175,9 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
kmsKey);
return bqServices.getDatasetService(options).getTable(queryResultTable);
}
+
+ @Override
+ protected @Nullable String getTargetTableId(BigQueryOptions options) throws
Exception {
+ return null;
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index ac8f2ae..504ec9e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -94,6 +94,8 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
*/
protected abstract Table getTargetTable(BigQueryOptions options) throws
Exception;
+ protected abstract @Nullable String getTargetTableId(BigQueryOptions
options) throws Exception;
+
@Override
public Coder<T> getOutputCoder() {
return outputCoder;
@@ -105,9 +107,16 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Table targetTable = getTargetTable(bqOptions);
- ReadSession.Builder readSessionBuilder =
- ReadSession.newBuilder()
-
.setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+ String tableReferenceId = "";
+ if (targetTable != null) {
+ tableReferenceId =
BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+ } else {
+ // If the table does not exist targetTable will be null.
+ // Construct the table id if we can generate it. For error
recording/logging.
+ tableReferenceId = getTargetTableId(bqOptions);
+ }
+
+ ReadSession.Builder readSessionBuilder =
ReadSession.newBuilder().setTable(tableReferenceId);
if (selectedFieldsProvider != null || rowRestrictionProvider != null) {
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index 1a7bc59..462c720 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -21,7 +21,9 @@ import static
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FailedPreconditionException;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -33,6 +35,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
@@ -162,6 +165,9 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
private long rowsConsumedFromCurrentResponse;
private long totalRowsInCurrentResponse;
+ private TableReference tableReference;
+ private ServiceCallMetric serviceCallMetric;
+
private BigQueryStorageStreamReader(
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws
IOException {
this.source = source;
@@ -186,7 +192,9 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
.setOffset(currentOffset)
.build();
- responseStream = storageClient.readRows(request);
+ tableReference =
BigQueryUtils.toTableReference(source.readSession.getTable());
+ serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+ responseStream = storageClient.readRows(request,
source.readSession.getTable());
responseIterator = responseStream.iterator();
LOG.info("Started BigQuery Storage API read from stream {}.",
source.readStream.getName());
return readNextRecord();
@@ -205,7 +213,23 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
return false;
}
- ReadRowsResponse response = responseIterator.next();
+ ReadRowsResponse response;
+ try {
+ response = responseIterator.next();
+ // Since we don't have a direct hook to the underlying
+ // API call, record success ever time we read a record successfully.
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ } catch (ApiException e) {
+ // Occasionally the iterator will fail and raise an exception.
+ // Capture it here and record the error in the metric.
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+
progressAtResponseStart =
response.getStats().getProgress().getAtResponseStart();
progressAtResponseEnd =
response.getStats().getProgress().getAtResponseEnd();
totalRowsInCurrentResponse = response.getRowCount();
@@ -315,7 +339,8 @@ class BigQueryStorageStreamSource<T> extends
BoundedSource<T> {
ReadRowsRequest.newBuilder()
.setReadStream(splitResponse.getPrimaryStream().getName())
.setOffset(currentOffset + 1)
- .build());
+ .build(),
+ source.readSession.getTable());
newResponseIterator = newResponseStream.iterator();
newResponseIterator.hasNext();
} catch (FailedPreconditionException e) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 39ee5fb..2a14cd5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -102,11 +102,39 @@ public class BigQueryStorageTableSource<T> extends
BigQueryStorageSourceBase<T>
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return getTargetTable(options.as(BigQueryOptions.class)).getNumBytes();
+ Table table = getTargetTable(options.as(BigQueryOptions.class));
+ if (table != null) {
+ return table.getNumBytes();
+ }
+ // If the table does not exist, then it will be null.
+ // Avoid the NullPointerException here, allow a more meaningful table
"not_found"
+ // error to be shown to the user, upon table read.
+ return 0;
+ }
+
+ @Override
+ protected String getTargetTableId(BigQueryOptions options) throws Exception {
+ TableReference tableReference = tableReferenceProvider.get();
+ if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+ checkState(
+ !Strings.isNullOrEmpty(options.getProject()),
+ "No project ID set in %s or %s, cannot construct a complete %s",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName(),
+ TableReference.class.getSimpleName());
+ LOG.info(
+ "Project ID not set in {}. Using default project from {}.",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName());
+ tableReference.setProjectId(options.getProject());
+ }
+ return String.format(
+ "projects/%s/datasets/%s/tables/%s",
+ tableReference.getProjectId(), tableReference.getDatasetId(),
tableReference.getTableId());
}
@Override
- protected Table getTargetTable(BigQueryOptions options) throws Exception {
+ protected @Nullable Table getTargetTable(BigQueryOptions options) throws
Exception {
if (cachedTable.get() == null) {
TableReference tableReference = tableReferenceProvider.get();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 9166228..6025b22 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -24,6 +24,7 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.beam.sdk.values.Row.toRow;
import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
@@ -34,17 +35,23 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
@@ -75,6 +82,20 @@ import org.joda.time.format.DateTimeFormatterBuilder;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BigQueryUtils {
+
+ // For parsing the format returned on the API proto:
+ // google.cloud.bigquery.storage.v1.ReadSession.getTable()
+ // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+ private static final Pattern TABLE_RESOURCE_PATTERN =
+ Pattern.compile(
+
"^projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)$");
+
+ // For parsing the format used to refer to tables parameters in BigQueryIO.
+ // "{project_id}:{dataset_id}.{table_id}" or
+ // "{project_id}.{dataset_id}.{table_id}"
+ private static final Pattern SIMPLE_TABLE_PATTERN =
+
Pattern.compile("^(?<PROJECT>[^\\.:]+)[\\.:](?<DATASET>[^\\.:]+)[\\.](?<TABLE>[^\\.:]+)$");
+
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
@@ -909,4 +930,85 @@ public class BigQueryUtils {
"Does not support converting avro format: " +
value.getClass().getName());
}
}
+
+ /**
+ * @param fullTableId - Is one of the two forms commonly used to refer to
bigquery tables in the
+ * beam codebase:
+ * <ul>
+ * <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+ * <li>myproject:mydataset.mytable
+ * <li>myproject.mydataset.mytable
+ * </ul>
+ *
+ * @return a BigQueryTableIdentifier by parsing the fullTableId. If it
cannot be parsed properly
+ * null is returned.
+ */
+ public static @Nullable TableReference toTableReference(String fullTableId) {
+ // Try parsing the format:
+ // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+ Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+
+ // If that failed, try the format:
+ // "{project_id}:{dataset_id}.{table_id}" or
+ // "{project_id}.{dataset_id}.{table_id}"
+ m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+ return null;
+ }
+
+ private static ServiceCallMetric callMetricForMethod(
+ TableReference tableReference, String method) {
+ if (tableReference != null) {
+ // TODO(ajamato): Add Ptransform label. Populate it as empty for now to
prevent the
+ // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+ HashMap<String, String> baseLabels = new HashMap<String, String>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method);
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigQueryTable(
+ tableReference.getProjectId(),
+ tableReference.getDatasetId(),
+ tableReference.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID,
tableReference.getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_DATASET,
tableReference.getDatasetId());
+ baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE,
tableReference.getTableId());
+ return new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ }
+ return null;
+ }
+
+ /**
+ * @param tableReference - The table being read from. Can be a temporary BQ
table used to read
+ * from a SQL query.
+ * @return a ServiceCallMetric for recording statuses for all BQ API
responses related to reading
+ * elements directly from BigQuery in a process-wide metric. Such as:
calls to readRows,
+ * splitReadStream, createReadSession.
+ */
+ public static ServiceCallMetric readCallMetric(TableReference
tableReference) {
+ return callMetricForMethod(tableReference, "BigQueryBatchRead");
+ }
+
+ /**
+ * @param tableReference - The table being written to.
+ * @return a ServiceCallMetric for recording statuses for all BQ responses
related to writing
+ * elements directly to BigQuery in a process-wide metric. Such as:
insertAll.
+ */
+ public static ServiceCallMetric writeCallMetric(TableReference
tableReference) {
+ return callMetricForMethod(tableReference, "BigQueryBatchWrite");
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 81e1027..86f538a 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -821,7 +821,7 @@ public class BigQueryIOStorageQueryTest {
StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
when(fakeStorageClient.createReadSession(any())).thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
BigQueryIO.TypedRead<KV<String, Long>> typedRead =
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index c39cd11..135a536 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -774,7 +774,7 @@ public class BigQueryIOStorageReadTest {
createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -830,7 +830,7 @@ public class BigQueryIOStorageReadTest {
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -899,7 +899,7 @@ public class BigQueryIOStorageReadTest {
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.800, 0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
when(fakeStorageClient.splitReadStream(
@@ -916,7 +916,7 @@ public class BigQueryIOStorageReadTest {
createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.8, 1.0));
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+
ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(),
""))
.thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -974,7 +974,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call.
@@ -993,11 +993,12 @@ public class BigQueryIOStorageReadTest {
// This test will read rows 0 and 1 from the parent before
calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1,
2)));
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
.thenReturn(
new FakeBigQueryServerStream<>(parentResponses.subList(2,
parentResponses.size())));
@@ -1051,7 +1052,7 @@ public class BigQueryIOStorageReadTest {
// Mock the initial ReadRows call.
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+
ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(),
""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1091,7 +1092,8 @@ public class BigQueryIOStorageReadTest {
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(1).getName())
.setOffset(1)
- .build()))
+ .build(),
+ ""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1125,7 +1127,8 @@ public class BigQueryIOStorageReadTest {
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(2).getName())
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1191,7 +1194,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and
remainder_stream means
@@ -1259,7 +1262,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and
remainder_stream means
@@ -1280,7 +1283,8 @@ public class BigQueryIOStorageReadTest {
// This test will read rows 0 and 1 from the parent before
calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenThrow(
new FailedPreconditionException(
"Given row offset is invalid for stream.",
@@ -1373,7 +1377,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<KV<String, Long>> output =
@@ -1439,7 +1443,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<TableRow> output =
@@ -1507,7 +1511,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<KV<String, Long>> output =
@@ -1553,7 +1557,7 @@ public class BigQueryIOStorageReadTest {
ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5,
0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1602,7 +1606,7 @@ public class BigQueryIOStorageReadTest {
createResponseArrow(ARROW_SCHEMA, names.subList(4, 7),
values.subList(4, 7), 0.7, 1.0));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1668,7 +1672,7 @@ public class BigQueryIOStorageReadTest {
ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7,
0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponse));
when(fakeStorageClient.splitReadStream(
@@ -1685,7 +1689,7 @@ public class BigQueryIOStorageReadTest {
createResponseArrow(ARROW_SCHEMA, names.subList(3, 4),
values.subList(3, 4), 0.8, 1.0));
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+
ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(),
""))
.thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1734,7 +1738,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call.
@@ -1753,11 +1757,12 @@ public class BigQueryIOStorageReadTest {
// This test will read rows 0 and 1 from the parent before
calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1,
2)));
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
.thenReturn(
new FakeBigQueryServerStream<>(parentResponses.subList(2,
parentResponses.size())));
@@ -1824,7 +1829,7 @@ public class BigQueryIOStorageReadTest {
// Mock the initial ReadRows call.
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+
ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(),
""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mock the first SplitReadStream call.
@@ -1850,7 +1855,8 @@ public class BigQueryIOStorageReadTest {
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(1).getName())
.setOffset(1)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(otherResponses));
// Mock the second SplitReadStream call.
@@ -1875,7 +1881,8 @@ public class BigQueryIOStorageReadTest {
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(2).getName())
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(lastResponses));
BoundedSource<TableRow> source =
@@ -1929,7 +1936,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and
remainder_stream means
@@ -1994,7 +2001,7 @@ public class BigQueryIOStorageReadTest {
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
-
ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and
remainder_stream means
@@ -2015,7 +2022,8 @@ public class BigQueryIOStorageReadTest {
// This test will read rows 0 and 1 from the parent before
calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenThrow(
new FailedPreconditionException(
"Given row offset is invalid for stream.",
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 61b94f6..53d8db0 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -48,6 +49,8 @@ import
com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.util.MockSleeper;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
@@ -62,6 +65,12 @@ import
com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import java.io.ByteArrayInputStream;
@@ -69,6 +78,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -179,15 +189,15 @@ public class BigQueryServicesImplTest {
}
}
- private void verifyWriteMetricWasSet(
- String projectId, String dataset, String table, String status, long
count) {
+ private void verifyRequestMetricWasSet(
+ String method, String projectId, String dataset, String table, String
status, long count) {
// Verify the metric as reported.
HashMap<String, String> labels = new HashMap<String, String>();
// TODO(ajamato): Add Ptransform label. Populate it as empty for now to
prevent the
// SpecMonitoringInfoValidator from dropping the MonitoringInfo.
labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
- labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, method);
labels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.bigQueryTable(projectId, dataset, table));
@@ -203,6 +213,16 @@ public class BigQueryServicesImplTest {
assertEquals(count, (long) container.getCounter(name).getCumulative());
}
+ private void verifyWriteMetricWasSet(
+ String projectId, String dataset, String table, String status, long
count) {
+ verifyRequestMetricWasSet("BigQueryBatchWrite", projectId, dataset, table,
status, count);
+ }
+
+ private void verifyReadMetricWasSet(
+ String projectId, String dataset, String table, String status, long
count) {
+ verifyRequestMetricWasSet("BigQueryBatchRead", projectId, dataset, table,
status, count);
+ }
+
/** Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob}
succeeds. */
@Test
public void testStartLoadJobSucceeds() throws IOException,
InterruptedException {
@@ -1399,4 +1419,145 @@ public class BigQueryServicesImplTest {
assertThat(failedInserts, is(expected));
}
+
+ @Test
+ public void testCreateReadSessionSetsRequestCountMetric()
+ throws InterruptedException, IOException {
+ BigQueryServicesImpl.StorageClientImpl client =
+ mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder();
+ builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+ CreateReadSessionRequest request = builder.build();
+ when(client.callCreateReadSession(request))
+ .thenReturn(ReadSession.newBuilder().build()); // Mock implementation.
+ when(client.createReadSession(any())).thenCallRealMethod(); // Real
implementation.
+
+ client.createReadSession(request);
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testCreateReadSessionSetsRequestCountMetricOnError()
+ throws InterruptedException, IOException {
+ BigQueryServicesImpl.StorageClientImpl client =
+ mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder();
+ builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+ CreateReadSessionRequest request = builder.build();
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.NOT_FOUND;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.callCreateReadSession(request))
+ .thenThrow(new ApiException("Not Found", null, statusCode, false)); //
Mock implementation.
+ when(client.createReadSession(any())).thenCallRealMethod(); // Real
implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Not Found");
+
+ client.createReadSession(request);
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "not_found",
1);
+ }
+
+ @Test
+ public void testReadRowsSetsRequestCountMetric() throws
InterruptedException, IOException {
+ BigQueryServices.StorageClient client =
mock(BigQueryServicesImpl.StorageClientImpl.class);
+ ReadRowsRequest request = null;
+ BigQueryServices.BigQueryServerStream<ReadRowsResponse> response =
+ new BigQueryServices.BigQueryServerStream<ReadRowsResponse>() {
+ @Override
+ public Iterator<ReadRowsResponse> iterator() {
+ return null;
+ }
+
+ @Override
+ public void cancel() {}
+ };
+
+ when(client.readRows(request)).thenReturn(response); // Mock
implementation.
+ when(client.readRows(any(), any())).thenCallRealMethod(); // Real
implementation.
+
+ client.readRows(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testReadRowsSetsRequestCountMetricOnError() throws
InterruptedException, IOException {
+ BigQueryServices.StorageClient client =
mock(BigQueryServicesImpl.StorageClientImpl.class);
+ ReadRowsRequest request = null;
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.INTERNAL;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.readRows(request))
+ .thenThrow(new ApiException("Internal", null, statusCode, false)); //
Mock implementation.
+ when(client.readRows(any(), any())).thenCallRealMethod(); // Real
implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Internal");
+
+ client.readRows(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "internal", 1);
+ }
+
+ @Test
+ public void testSplitReadStreamSetsRequestCountMetric() throws
InterruptedException, IOException {
+ BigQueryServices.StorageClient client =
mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ SplitReadStreamRequest request = null;
+ when(client.splitReadStream(request))
+ .thenReturn(SplitReadStreamResponse.newBuilder().build()); // Mock
implementation.
+ when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real
implementation.
+
+ client.splitReadStream(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testSplitReadStreamSetsRequestCountMetricOnError()
+ throws InterruptedException, IOException {
+ BigQueryServices.StorageClient client =
mock(BigQueryServicesImpl.StorageClientImpl.class);
+ SplitReadStreamRequest request = null;
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.RESOURCE_EXHAUSTED;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.splitReadStream(request))
+ .thenThrow(
+ new ApiException(
+ "Resource Exhausted", null, statusCode, false)); // Mock
implementation.
+ when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real
implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Resource Exhausted");
+
+ client.splitReadStream(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable",
"resource_exhausted", 1);
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 75c1315..0782622 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -26,9 +26,11 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
@@ -882,4 +884,82 @@ public class BigQueryUtilsTest {
record, AVRO_ARRAY_ARRAY_TYPE,
BigQueryUtils.ConversionOptions.builder().build());
assertEquals(expected, beamRow);
}
+
+ @Test
+ public void testToTableReference() {
+ {
+ TableReference tr =
+
BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable");
+ assertEquals("myproject", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ {
+ // Test colon(":") after project format
+ TableReference tr =
BigQueryUtils.toTableReference("myprojectwithcolon:mydataset.mytable");
+ assertEquals("myprojectwithcolon", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ {
+ // Test dot(".") after project format
+ TableReference tr =
BigQueryUtils.toTableReference("myprojectwithdot.mydataset.mytable");
+ assertEquals("myprojectwithdot", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ // Invalid scenarios
+ assertNull(BigQueryUtils.toTableReference(""));
+ assertNull(BigQueryUtils.toTableReference(":."));
+ assertNull(BigQueryUtils.toTableReference(".."));
+ assertNull(BigQueryUtils.toTableReference("myproject"));
+ assertNull(BigQueryUtils.toTableReference("myproject:"));
+ assertNull(BigQueryUtils.toTableReference("myproject."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset"));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset:mytable:"));
+
assertNull(BigQueryUtils.toTableReference(".invalidleadingdot.mydataset.mytable"));
+
assertNull(BigQueryUtils.toTableReference("invalidtrailingdot.mydataset.mytable."));
+
assertNull(BigQueryUtils.toTableReference(":invalidleadingcolon.mydataset.mytable"));
+
assertNull(BigQueryUtils.toTableReference("invalidtrailingcolon.mydataset.mytable:"));
+
assertNull(BigQueryUtils.toTableReference("myproject.mydataset.mytable.myinvalidpart"));
+
assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable.myinvalidpart"));
+
+ assertNull(
+
BigQueryUtils.toTableReference("/projects/extraslash/datasets/mydataset/tables/mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects//extraslash/datasets/mydataset/tables/mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects/extraslash//datasets/mydataset/tables/mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects/extraslash/datasets//mydataset/tables/mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset//tables/mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables//mytable"));
+ assertNull(
+
BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables/mytable/"));
+
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables//"));
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets//tables/mytable/"));
+
assertNull(BigQueryUtils.toTableReference("projects//datasets/mydataset/tables/mytable/"));
+ assertNull(BigQueryUtils.toTableReference("projects//datasets//tables//"));
+
+ assertNull(
+
BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable/"));
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/"));
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables"));
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/"));
+
assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject"));
+ assertNull(BigQueryUtils.toTableReference("projects/"));
+ assertNull(BigQueryUtils.toTableReference("projects"));
+ }
}