ajamato commented on a change in pull request #15117:
URL: https://github.com/apache/beam/pull/15117#discussion_r674231192
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -909,4 +930,101 @@ private static Object convertAvroNumeric(Object value) {
"Does not support converting avro format: " +
value.getClass().getName());
}
}
+
+ /**
+ * @param fullTableId - Is one of the two forms commonly used to refer to
bigquery tables in the
+ * beam codebase:
+ * <ul>
+ * <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+ * <li>myproject:mydataset.mytable
+ * <li>myproject.mydataset.mytable
+ * </ul>
+ *
+ * @return a BigQueryTableIdentifier by parsing the fullTableId. If it
cannot be parsed properly
+ * null is returned.
+ */
+ public static @Nullable TableReference toTableReference(String fullTableId) {
+ // Try parsing the format:
+ // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+ Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+
+ // If that failed, try the format:
+ // "{project_id}:{dataset_id}.{table_id}" or
+ // "{project_id}.{dataset_id}.{table_id}"
+ m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+ return null;
+ }
+
+ /**
+ * @param tableReference - The table being read from. Can be a temporary BQ
table used to read
+ * from a SQL query.
+ * @return a ServiceCallMetric for recording statuses for all BQ API
responses related to reading
+ * elements directly from BigQuery in a process-wide metric. Such as:
calls to readRows,
+ * splitReadStream, createReadSession.
+ */
+ public static ServiceCallMetric readCallMetric(TableReference
tableReference) {
+ if (tableReference != null) {
+ // TODO(ajamato): Add Ptransform label. Populate it as empty for now to
prevent the
+ // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+ HashMap<String, String> baseLabels = new HashMap<String, String>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"BigQueryBatchRead");
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigQueryTable(
+ tableReference.getProjectId(),
+ tableReference.getDatasetId(),
+ tableReference.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID,
tableReference.getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_DATASET,
tableReference.getDatasetId());
+ baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE,
tableReference.getTableId());
+ return new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ }
+ return null;
+ }
+
+ /**
+ * @param tableReference - The table being written to.
+ * @return a ServiceCallMetric for recording statuses for all BQ responses
related to writing
+ * elements directly to BigQuery in a process-wide metric. Such as:
insertAll.
+ */
+ public static ServiceCallMetric writeCallMetric(TableReference
tableReference) {
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -1344,21 +1329,82 @@ private StorageClientImpl(BigQueryOptions options)
throws IOException {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}
+ // Since BigQueryReadClient client's methods are final they cannot be
mocked with Mockito for
+ // testing
+ // So this wrapper method can be mocked in tests, instead.
+ ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+ return client.createReadSession(request);
+ }
+
@Override
public ReadSession createReadSession(CreateReadSessionRequest request) {
- return client.createReadSession(request);
+ TableReference tableReference =
+ BigQueryUtils.toTableReference(request.getReadSession().getTable());
+ ServiceCallMetric serviceCallMetric =
BigQueryUtils.readCallMetric(tableReference);
+ try {
+ ReadSession session = callCreateReadSession(request);
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ return session;
+
+ } catch (ApiException e) {
+ if (serviceCallMetric != null
+ && e.getStatusCode() != null
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -236,8 +236,12 @@ StreamAppendClient getStreamAppendClient(String
streamName, Descriptor descripto
/** Read rows in the context of a specific read stream. */
BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
+ BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request,
String fullTableId);
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -105,9 +107,18 @@
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Table targetTable = getTargetTable(bqOptions);
- ReadSession.Builder readSessionBuilder =
- ReadSession.newBuilder()
-
.setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+ String tableReferenceId = "";
+ if (targetTable != null) {
+ tableReferenceId =
BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+ } else {
+ // If the table does not exist targetTable will be null.
+ // Construct the table id if we can generate it. For error
recording/logging.
+ if (tableReferenceId != null) {
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -94,6 +94,8 @@
*/
protected abstract Table getTargetTable(BigQueryOptions options) throws
Exception;
+ protected abstract String getTargetTableId(BigQueryOptions options) throws
Exception;
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]