baeminbo commented on code in PR #17382:
URL: https://github.com/apache/beam/pull/17382#discussion_r852549194


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java:
##########
@@ -96,105 +96,112 @@ public static TableReference executeQuery(
       throws InterruptedException, IOException {
     // Step 1: Find the effective location of the query.
     String effectiveLocation = location;
-    DatasetService tableService = bqServices.getDatasetService(options);
-    if (effectiveLocation == null) {
-      List<TableReference> referencedTables =
-          dryRunQueryIfNeeded(
-                  bqServices,
-                  options,
-                  dryRunJobStats,
-                  query,
-                  flattenResults,
-                  useLegacySql,
-                  location)
-              .getQuery()
-              .getReferencedTables();
-      if (referencedTables != null && !referencedTables.isEmpty()) {
-        TableReference referencedTable = referencedTables.get(0);
-        effectiveLocation =
-            tableService
-                .getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
-                .getLocation();
+    try (DatasetService tableService = bqServices.getDatasetService(options)) {
+      if (effectiveLocation == null) {
+        List<TableReference> referencedTables =
+            dryRunQueryIfNeeded(
+                    bqServices,
+                    options,
+                    dryRunJobStats,
+                    query,
+                    flattenResults,
+                    useLegacySql,
+                    location)
+                .getQuery()
+                .getReferencedTables();
+        if (referencedTables != null && !referencedTables.isEmpty()) {
+          TableReference referencedTable = referencedTables.get(0);
+          effectiveLocation =
+              tableService
+                  .getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
+                  .getLocation();
+        }
       }
-    }
 
-    // Step 2: Create a temporary dataset in the query location only if the 
user has not specified a
-    // temp dataset.
-    String queryJobId =
-        BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), 
stepUuid, JobType.QUERY);
-    Optional<String> queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
-    TableReference queryResultTable =
-        createTempTableReference(
-            options.getBigQueryProject() == null
-                ? options.getProject()
-                : options.getBigQueryProject(),
-            queryJobId,
-            queryTempDatasetOpt);
-
-    boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
-    // Create dataset only if it has not been set by the user
-    if (beamToCreateTempDataset) {
-      LOG.info("Creating temporary dataset {} for query results", 
queryResultTable.getDatasetId());
-
-      tableService.createDataset(
-          queryResultTable.getProjectId(),
-          queryResultTable.getDatasetId(),
-          effectiveLocation,
-          "Temporary tables for query results of job " + options.getJobName(),
-          TimeUnit.DAYS.toMillis(1));
-    } else { // If the user specified a temp dataset, check that the 
destination table does not
-      // exist
-      Table destTable = tableService.getTable(queryResultTable);
-      checkArgument(
-          destTable == null,
-          "Refusing to write on existing table {} in the specified temp 
dataset {}",
-          queryResultTable.getTableId(),
-          queryResultTable.getDatasetId());
-    }
+      // Step 2: Create a temporary dataset in the query location only if the 
user has not specified
+      // a temp dataset.
+      String queryJobId =
+          BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), 
stepUuid, JobType.QUERY);
+      Optional<String> queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
+      TableReference queryResultTable =
+          createTempTableReference(
+              options.getBigQueryProject() == null
+                  ? options.getProject()
+                  : options.getBigQueryProject(),
+              queryJobId,
+              queryTempDatasetOpt);
+
+      boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+      // Create dataset only if it has not been set by the user
+      if (beamToCreateTempDataset) {
+        LOG.info(
+            "Creating temporary dataset {} for query results", 
queryResultTable.getDatasetId());
+
+        tableService.createDataset(
+            queryResultTable.getProjectId(),
+            queryResultTable.getDatasetId(),
+            effectiveLocation,
+            "Temporary tables for query results of job " + 
options.getJobName(),
+            TimeUnit.DAYS.toMillis(1));
+      } else { // If the user specified a temp dataset, check that the 
destination table does not
+        // exist
+        Table destTable = tableService.getTable(queryResultTable);
+        checkArgument(
+            destTable == null,
+            "Refusing to write on existing table {} in the specified temp 
dataset {}",
+            queryResultTable.getTableId(),
+            queryResultTable.getDatasetId());
+      }
 
-    // Step 3: Execute the query. Generate a transient (random) query job ID, 
because this code may
-    // be retried after the temporary dataset and table have been deleted by a 
previous attempt --
-    // in that case, we want to regenerate the temporary dataset and table, 
and we'll need a fresh
-    // query ID to do that.
-    LOG.info(
-        "Exporting query results into temporary table {} using job {}",
-        queryResultTable,
-        queryJobId);
-
-    JobReference jobReference =
-        new JobReference()
-            .setProjectId(
-                options.getBigQueryProject() == null
-                    ? options.getProject()
-                    : options.getBigQueryProject())
-            .setLocation(effectiveLocation)
-            .setJobId(queryJobId);
-
-    JobConfigurationQuery queryConfiguration =
-        createBasicQueryConfig(query, flattenResults, useLegacySql)
-            .setAllowLargeResults(true)
-            .setDestinationTable(queryResultTable)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .setWriteDisposition("WRITE_TRUNCATE")
-            .setPriority(priority.name());
-
-    if (kmsKey != null) {
-      queryConfiguration.setDestinationEncryptionConfiguration(
-          new EncryptionConfiguration().setKmsKeyName(kmsKey));
-    }
+      // Step 3: Execute the query. Generate a transient (random) query job 
ID, because this code
+      // may be retried after the temporary dataset and table have been 
deleted by a previous
+      // attempt -- in that case, we want to regenerate the temporary dataset 
and table, and we'll
+      // need a fresh query ID to do that.
+      LOG.info(
+          "Exporting query results into temporary table {} using job {}",
+          queryResultTable,
+          queryJobId);
+
+      JobReference jobReference =
+          new JobReference()
+              .setProjectId(
+                  options.getBigQueryProject() == null
+                      ? options.getProject()
+                      : options.getBigQueryProject())
+              .setLocation(effectiveLocation)
+              .setJobId(queryJobId);
+
+      JobConfigurationQuery queryConfiguration =
+          createBasicQueryConfig(query, flattenResults, useLegacySql)
+              .setAllowLargeResults(true)
+              .setDestinationTable(queryResultTable)
+              .setCreateDisposition("CREATE_IF_NEEDED")
+              .setWriteDisposition("WRITE_TRUNCATE")
+              .setPriority(priority.name());
+
+      if (kmsKey != null) {
+        queryConfiguration.setDestinationEncryptionConfiguration(
+            new EncryptionConfiguration().setKmsKeyName(kmsKey));
+      }
 
-    JobService jobService = bqServices.getJobService(options);
-    jobService.startQueryJob(jobReference, queryConfiguration);
-    Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
-    if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
-      throw new IOException(
-          String.format(
-              "Query job %s failed, status: %s",
-              queryJobId, 
BigQueryHelpers.statusToPrettyString(job.getStatus())));
-    }
+      JobService jobService = bqServices.getJobService(options);
+      jobService.startQueryJob(jobReference, queryConfiguration);
+      Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
+      if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+        throw new IOException(
+            String.format(
+                "Query job %s failed, status: %s",
+                queryJobId, 
BigQueryHelpers.statusToPrettyString(job.getStatus())));
+      }
 
-    LOG.info("Query job {} completed", queryJobId);
-    return queryResultTable;
+      LOG.info("Query job {} completed", queryJobId);
+      return queryResultTable;
+
+    } catch (RuntimeException | IOException | InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   `DatasetService.close()` throws `Exception` (not `IOException`) and the 
method `executeQuery` has only `IOException` and `InterruptedException` as 
throws in the method signature. Without the new catch blocks, `try 
(DatasetService tableService = bqServices.getDatasetService(options)) { ... }` 
causes compile error.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to