ihji commented on a change in pull request #15117:
URL: https://github.com/apache/beam/pull/15117#discussion_r673538941



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -1344,21 +1329,82 @@ private StorageClientImpl(BigQueryOptions options) 
throws IOException {
       this.client = BigQueryReadClient.create(settingsBuilder.build());
     }
 
+    // Since BigQueryReadClient client's methods are final they cannot be 
mocked with Mockito for
+    // testing
+    // So this wrapper method can be mocked in tests, instead.
+    ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+      return client.createReadSession(request);
+    }
+
     @Override
     public ReadSession createReadSession(CreateReadSessionRequest request) {
-      return client.createReadSession(request);
+      TableReference tableReference =
+          BigQueryUtils.toTableReference(request.getReadSession().getTable());
+      ServiceCallMetric serviceCallMetric = 
BigQueryUtils.readCallMetric(tableReference);
+      try {
+        ReadSession session = callCreateReadSession(request);
+        if (serviceCallMetric != null) {
+          serviceCallMetric.call("ok");
+        }
+        return session;
+
+      } catch (ApiException e) {
+        if (serviceCallMetric != null
+            && e.getStatusCode() != null

Review comment:
       Is it necessary to check the nullity of every call chain?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -105,9 +107,18 @@
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     Table targetTable = getTargetTable(bqOptions);
 
-    ReadSession.Builder readSessionBuilder =
-        ReadSession.newBuilder()
-            
.setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+    String tableReferenceId = "";
+    if (targetTable != null) {
+      tableReferenceId = 
BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+    } else {
+      // If the table does not exist targetTable will be null.
+      // Construct the table id if we can generate it. For error 
recording/logging.
+      if (tableReferenceId != null) {

Review comment:
       Looks like `tableReferenceId != null` is always true here.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -94,6 +94,8 @@
    */
   protected abstract Table getTargetTable(BigQueryOptions options) throws 
Exception;
 
+  protected abstract String getTargetTableId(BigQueryOptions options) throws 
Exception;

Review comment:
       Might need `@Nullable` annotation?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
##########
@@ -236,8 +236,12 @@ StreamAppendClient getStreamAppendClient(String 
streamName, Descriptor descripto
     /** Read rows in the context of a specific read stream. */
     BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
 
+    BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request, 
String fullTableId);

Review comment:
       It seems that only `fullTableId` variants support metric capture. Could 
you make it explicit in the comment to avoid confusion (or remove 
non-`fullTableId` variants)?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -909,4 +930,101 @@ private static Object convertAvroNumeric(Object value) {
           "Does not support converting avro format: " + 
value.getClass().getName());
     }
   }
+
+  /**
+   * @param fullTableId - Is one of the two forms commonly used to refer to 
bigquery tables in the
+   *     beam codebase:
+   *     <ul>
+   *       <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+   *       <li>myproject:mydataset.mytable
+   *       <li>myproject.mydataset.mytable
+   *     </ul>
+   *
+   * @return a BigQueryTableIdentifier by parsing the fullTableId. If it 
cannot be parsed properly
+   *     null is returned.
+   */
+  public static @Nullable TableReference toTableReference(String fullTableId) {
+    // Try parsing the format:
+    // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+    Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+
+    // If that failed, try the format:
+    // "{project_id}:{dataset_id}.{table_id}" or
+    // "{project_id}.{dataset_id}.{table_id}"
+    m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+    if (m.matches()) {
+      return new TableReference()
+          .setProjectId(m.group("PROJECT"))
+          .setDatasetId(m.group("DATASET"))
+          .setTableId(m.group("TABLE"));
+    }
+    return null;
+  }
+
+  /**
+   * @param tableReference - The table being read from. Can be a temporary BQ 
table used to read
+   *     from a SQL query.
+   * @return a ServiceCallMetric for recording statuses for all BQ API 
responses related to reading
+   *     elements directly from BigQuery in a process-wide metric. Such as: 
calls to readRows,
+   *     splitReadStream, createReadSession.
+   */
+  public static ServiceCallMetric readCallMetric(TableReference 
tableReference) {
+    if (tableReference != null) {
+      // TODO(ajamato): Add Ptransform label. Populate it as empty for now to 
prevent the
+      // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+      HashMap<String, String> baseLabels = new HashMap<String, String>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"BigQueryBatchRead");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigQueryTable(
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              tableReference.getTableId()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, 
tableReference.getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGQUERY_DATASET, 
tableReference.getDatasetId());
+      baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, 
tableReference.getTableId());
+      return new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+    }
+    return null;
+  }
+
+  /**
+   * @param tableReference - The table being written to.
+   * @return a ServiceCallMetric for recording statuses for all BQ responses 
related to writing
+   *     elements directly to BigQuery in a process-wide metric. Such as: 
insertAll.
+   */
+  public static ServiceCallMetric writeCallMetric(TableReference 
tableReference) {

Review comment:
       `writeCallMetric` and `readCallMetric` look almost identical except 
`METHOD` field. Is it possible to refactor the common part into a separate 
method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to