apilloud commented on a change in pull request #15480:
URL: https://github.com/apache/beam/pull/15480#discussion_r704686826



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -965,49 +965,53 @@ public void validate(PipelineOptions options) {
       // earlier stages of the pipeline or if a query depends on earlier 
stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable 
the check.
       if (getValidate()) {
-        if (table != null) {
-          checkArgument(table.isAccessible(), "Cannot call validate if table 
is dynamically set.");
-        }
-        if (table != null && table.get().getProjectId() != null) {
-          // Check for source table presence for early failure notification.
-          DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-          BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
-          BigQueryHelpers.verifyTablePresence(datasetService, table.get());
-        } else if (getQuery() != null) {
-          checkArgument(
-              getQuery().isAccessible(), "Cannot call validate if query is 
dynamically set.");
-          JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
-          try {
-            jobService.dryRunQuery(
-                bqOptions.getBigQueryProject() == null
-                    ? bqOptions.getProject()
-                    : bqOptions.getBigQueryProject(),
-                new JobConfigurationQuery()
-                    .setQuery(getQuery().get())
-                    .setFlattenResults(getFlattenResults())
-                    .setUseLegacySql(getUseLegacySql()),
-                getQueryLocation());
-          } catch (Exception e) {
-            throw new IllegalArgumentException(
-                String.format(QUERY_VALIDATION_FAILURE_ERROR, 
getQuery().get()), e);
+        try (DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions)) {
+          if (table != null) {
+            checkArgument(
+                table.isAccessible(), "Cannot call validate if table is 
dynamically set.");
           }
+          if (table != null && table.get().getProjectId() != null) {
+            // Check for source table presence for early failure notification.
+            BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+            BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+          } else if (getQuery() != null) {
+            checkArgument(
+                getQuery().isAccessible(), "Cannot call validate if query is 
dynamically set.");
+            JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
+            try {
+              jobService.dryRunQuery(
+                  bqOptions.getBigQueryProject() == null
+                      ? bqOptions.getProject()
+                      : bqOptions.getBigQueryProject(),
+                  new JobConfigurationQuery()
+                      .setQuery(getQuery().get())
+                      .setFlattenResults(getFlattenResults())
+                      .setUseLegacySql(getUseLegacySql()),
+                  getQueryLocation());
+            } catch (Exception e) {
+              throw new IllegalArgumentException(
+                  String.format(QUERY_VALIDATION_FAILURE_ERROR, 
getQuery().get()), e);
+            }
 
-          DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-          // If the user provided a temp dataset, check if the dataset exists 
before launching the
-          // query
-          if (getQueryTempDataset() != null) {
-            // The temp table is only used for dataset and project id 
validation, not for table name
-            // validation
-            TableReference tempTable =
-                new TableReference()
-                    .setProjectId(
-                        bqOptions.getBigQueryProject() == null
-                            ? bqOptions.getProject()
-                            : bqOptions.getBigQueryProject())
-                    .setDatasetId(getQueryTempDataset())
-                    .setTableId("dummy table");
-            BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            // If the user provided a temp dataset, check if the dataset 
exists before launching the
+            // query
+            if (getQueryTempDataset() != null) {
+              // The temp table is only used for dataset and project id 
validation, not for table
+              // name
+              // validation
+              TableReference tempTable =
+                  new TableReference()
+                      .setProjectId(
+                          bqOptions.getBigQueryProject() == null
+                              ? bqOptions.getProject()
+                              : bqOptions.getBigQueryProject())
+                      .setDatasetId(getQueryTempDataset())
+                      .setTableId("dummy table");
+              BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            }
           }
+        } catch (Exception e) {

Review comment:
       Do you need this here? Can you catch more specific exceptions?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2484,17 +2492,20 @@ public void validate(PipelineOptions pipelineOptions) {
       // The user specified a table.
       if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && 
getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
-        DatasetService datasetService = 
getBigQueryServices().getDatasetService(options);
-        // Check for destination table presence and emptiness for early 
failure notification.
-        // Note that a presence check can fail when the table or dataset is 
created by an earlier
-        // stage of the pipeline. For these cases the #withoutValidation 
method can be used to
-        // disable the check.
-        BigQueryHelpers.verifyDatasetPresence(datasetService, table);
-        if (getCreateDisposition() == 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-          BigQueryHelpers.verifyTablePresence(datasetService, table);
-        }
-        if (getWriteDisposition() == 
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+        try (DatasetService datasetService = 
getBigQueryServices().getDatasetService(options)) {
+          // Check for destination table presence and emptiness for early 
failure notification.
+          // Note that a presence check can fail when the table or dataset is 
created by an earlier
+          // stage of the pipeline. For these cases the #withoutValidation 
method can be used to
+          // disable the check.
+          BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+          if (getCreateDisposition() == 
BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+            BigQueryHelpers.verifyTablePresence(datasetService, table);
+          }
+          if (getWriteDisposition() == 
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+            BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+          }
+        } catch (Exception e) {

Review comment:
       Not needed?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -222,6 +221,15 @@ public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> i
     /** The list of unique ids for each BigQuery table row. */
     private transient Map<String, List<String>> uniqueIdsForTableRows;
 
+    private @Nullable DatasetService datasetService;

Review comment:
       nit: should this be transient?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1401,15 +1405,19 @@ void cleanup(ContextContainer c) throws Exception {
                           options.getJobName(), jobUuid, JobType.QUERY),
                       queryTempDataset);
 
-              DatasetService datasetService = 
getBigQueryServices().getDatasetService(options);
-              LOG.info("Deleting temporary table with query results {}", 
tempTable);
-              datasetService.deleteTable(tempTable);
-              // Delete dataset only if it was created by Beam
-              boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
-              if (datasetCreatedByBeam) {
-                LOG.info(
-                    "Deleting temporary dataset with query results {}", 
tempTable.getDatasetId());
-                datasetService.deleteDataset(tempTable.getProjectId(), 
tempTable.getDatasetId());
+              try (DatasetService datasetService =
+                  getBigQueryServices().getDatasetService(options)) {
+                LOG.info("Deleting temporary table with query results {}", 
tempTable);
+                datasetService.deleteTable(tempTable);
+                // Delete dataset only if it was created by Beam
+                boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
+                if (datasetCreatedByBeam) {
+                  LOG.info(
+                      "Deleting temporary dataset with query results {}", 
tempTable.getDatasetId());
+                  datasetService.deleteDataset(tempTable.getProjectId(), 
tempTable.getDatasetId());
+                }
+              } catch (Exception e) {

Review comment:
       It doesn't look like this is needed?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -325,13 +345,22 @@ public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> i
   // shuffling.
   private class InsertBatchedElements
       extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, 
Void> {
+    private @Nullable DatasetService datasetService;

Review comment:
       nit: should this be transient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to