baeminbo commented on code in PR #17382:
URL: https://github.com/apache/beam/pull/17382#discussion_r852517472
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java:
##########
@@ -96,105 +96,112 @@ public static TableReference executeQuery(
throws InterruptedException, IOException {
// Step 1: Find the effective location of the query.
String effectiveLocation = location;
- DatasetService tableService = bqServices.getDatasetService(options);
- if (effectiveLocation == null) {
- List<TableReference> referencedTables =
- dryRunQueryIfNeeded(
- bqServices,
- options,
- dryRunJobStats,
- query,
- flattenResults,
- useLegacySql,
- location)
- .getQuery()
- .getReferencedTables();
- if (referencedTables != null && !referencedTables.isEmpty()) {
- TableReference referencedTable = referencedTables.get(0);
- effectiveLocation =
- tableService
- .getDataset(referencedTable.getProjectId(),
referencedTable.getDatasetId())
- .getLocation();
+ try (DatasetService tableService = bqServices.getDatasetService(options)) {
+ if (effectiveLocation == null) {
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(
+ bqServices,
+ options,
+ dryRunJobStats,
+ query,
+ flattenResults,
+ useLegacySql,
+ location)
+ .getQuery()
+ .getReferencedTables();
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference referencedTable = referencedTables.get(0);
+ effectiveLocation =
+ tableService
+ .getDataset(referencedTable.getProjectId(),
referencedTable.getDatasetId())
+ .getLocation();
+ }
}
- }
- // Step 2: Create a temporary dataset in the query location only if the
user has not specified a
- // temp dataset.
- String queryJobId =
- BigQueryResourceNaming.createJobIdPrefix(options.getJobName(),
stepUuid, JobType.QUERY);
- Optional<String> queryTempDatasetOpt =
Optional.ofNullable(queryTempDatasetId);
- TableReference queryResultTable =
- createTempTableReference(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject(),
- queryJobId,
- queryTempDatasetOpt);
-
- boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
- // Create dataset only if it has not been set by the user
- if (beamToCreateTempDataset) {
- LOG.info("Creating temporary dataset {} for query results",
queryResultTable.getDatasetId());
-
- tableService.createDataset(
- queryResultTable.getProjectId(),
- queryResultTable.getDatasetId(),
- effectiveLocation,
- "Temporary tables for query results of job " + options.getJobName(),
- TimeUnit.DAYS.toMillis(1));
- } else { // If the user specified a temp dataset, check that the
destination table does not
- // exist
- Table destTable = tableService.getTable(queryResultTable);
- checkArgument(
- destTable == null,
- "Refusing to write on existing table {} in the specified temp
dataset {}",
- queryResultTable.getTableId(),
- queryResultTable.getDatasetId());
- }
+ // Step 2: Create a temporary dataset in the query location only if the
user has not specified
+ // a temp dataset.
+ String queryJobId =
+ BigQueryResourceNaming.createJobIdPrefix(options.getJobName(),
stepUuid, JobType.QUERY);
+ Optional<String> queryTempDatasetOpt =
Optional.ofNullable(queryTempDatasetId);
+ TableReference queryResultTable =
+ createTempTableReference(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject(),
+ queryJobId,
+ queryTempDatasetOpt);
+
+ boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+ // Create dataset only if it has not been set by the user
+ if (beamToCreateTempDataset) {
+ LOG.info(
+ "Creating temporary dataset {} for query results",
queryResultTable.getDatasetId());
+
+ tableService.createDataset(
+ queryResultTable.getProjectId(),
+ queryResultTable.getDatasetId(),
+ effectiveLocation,
+ "Temporary tables for query results of job " +
options.getJobName(),
+ TimeUnit.DAYS.toMillis(1));
+ } else { // If the user specified a temp dataset, check that the
destination table does not
+ // exist
+ Table destTable = tableService.getTable(queryResultTable);
+ checkArgument(
+ destTable == null,
+ "Refusing to write on existing table {} in the specified temp
dataset {}",
+ queryResultTable.getTableId(),
+ queryResultTable.getDatasetId());
+ }
- // Step 3: Execute the query. Generate a transient (random) query job ID,
because this code may
- // be retried after the temporary dataset and table have been deleted by a
previous attempt --
- // in that case, we want to regenerate the temporary dataset and table,
and we'll need a fresh
- // query ID to do that.
- LOG.info(
- "Exporting query results into temporary table {} using job {}",
- queryResultTable,
- queryJobId);
-
- JobReference jobReference =
- new JobReference()
- .setProjectId(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject())
- .setLocation(effectiveLocation)
- .setJobId(queryJobId);
-
- JobConfigurationQuery queryConfiguration =
- createBasicQueryConfig(query, flattenResults, useLegacySql)
- .setAllowLargeResults(true)
- .setDestinationTable(queryResultTable)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .setWriteDisposition("WRITE_TRUNCATE")
- .setPriority(priority.name());
-
- if (kmsKey != null) {
- queryConfiguration.setDestinationEncryptionConfiguration(
- new EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
+ // Step 3: Execute the query. Generate a transient (random) query job
ID, because this code
+ // may be retried after the temporary dataset and table have been
deleted by a previous
+ // attempt -- in that case, we want to regenerate the temporary dataset
and table, and we'll
+ // need a fresh query ID to do that.
+ LOG.info(
+ "Exporting query results into temporary table {} using job {}",
+ queryResultTable,
+ queryJobId);
+
+ JobReference jobReference =
+ new JobReference()
+ .setProjectId(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject())
+ .setLocation(effectiveLocation)
+ .setJobId(queryJobId);
+
+ JobConfigurationQuery queryConfiguration =
+ createBasicQueryConfig(query, flattenResults, useLegacySql)
+ .setAllowLargeResults(true)
+ .setDestinationTable(queryResultTable)
+ .setCreateDisposition("CREATE_IF_NEEDED")
+ .setWriteDisposition("WRITE_TRUNCATE")
+ .setPriority(priority.name());
+
+ if (kmsKey != null) {
+ queryConfiguration.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
- JobService jobService = bqServices.getJobService(options);
- jobService.startQueryJob(jobReference, queryConfiguration);
- Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
- if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
- throw new IOException(
- String.format(
- "Query job %s failed, status: %s",
- queryJobId,
BigQueryHelpers.statusToPrettyString(job.getStatus())));
- }
+ JobService jobService = bqServices.getJobService(options);
+ jobService.startQueryJob(jobReference, queryConfiguration);
+ Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
+ if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+ throw new IOException(
+ String.format(
+ "Query job %s failed, status: %s",
+ queryJobId,
BigQueryHelpers.statusToPrettyString(job.getStatus())));
+ }
- LOG.info("Query job {} completed", queryJobId);
- return queryResultTable;
+ LOG.info("Query job {} completed", queryJobId);
+ return queryResultTable;
+
+ } catch (RuntimeException | IOException | InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
The method throws `InterruptedException` and `IOException` only. I refer to
[BigQueryHelpers.getNumRows](https://github.com/apache/beam/blob/v2.38.0-RC1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L554-L556).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]