apilloud commented on a change in pull request #15480:
URL: https://github.com/apache/beam/pull/15480#discussion_r704686826
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -965,49 +965,53 @@ public void validate(PipelineOptions options) {
// earlier stages of the pipeline or if a query depends on earlier
stages of a pipeline.
// For these cases the withoutValidation method can be used to disable
the check.
if (getValidate()) {
- if (table != null) {
- checkArgument(table.isAccessible(), "Cannot call validate if table
is dynamically set.");
- }
- if (table != null && table.get().getProjectId() != null) {
- // Check for source table presence for early failure notification.
- DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions);
- BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
- BigQueryHelpers.verifyTablePresence(datasetService, table.get());
- } else if (getQuery() != null) {
- checkArgument(
- getQuery().isAccessible(), "Cannot call validate if query is
dynamically set.");
- JobService jobService =
getBigQueryServices().getJobService(bqOptions);
- try {
- jobService.dryRunQuery(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject(),
- new JobConfigurationQuery()
- .setQuery(getQuery().get())
- .setFlattenResults(getFlattenResults())
- .setUseLegacySql(getUseLegacySql()),
- getQueryLocation());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR,
getQuery().get()), e);
+ try (DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions)) {
+ if (table != null) {
+ checkArgument(
+ table.isAccessible(), "Cannot call validate if table is
dynamically set.");
}
+ if (table != null && table.get().getProjectId() != null) {
+ // Check for source table presence for early failure notification.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+ BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+ } else if (getQuery() != null) {
+ checkArgument(
+ getQuery().isAccessible(), "Cannot call validate if query is
dynamically set.");
+ JobService jobService =
getBigQueryServices().getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject(),
+ new JobConfigurationQuery()
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()),
+ getQueryLocation());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR,
getQuery().get()), e);
+ }
- DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions);
- // If the user provided a temp dataset, check if the dataset exists
before launching the
- // query
- if (getQueryTempDataset() != null) {
- // The temp table is only used for dataset and project id
validation, not for table name
- // validation
- TableReference tempTable =
- new TableReference()
- .setProjectId(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject())
- .setDatasetId(getQueryTempDataset())
- .setTableId("dummy table");
- BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ // If the user provided a temp dataset, check if the dataset
exists before launching the
+ // query
+ if (getQueryTempDataset() != null) {
+ // The temp table is only used for dataset and project id
validation, not for table
+ // name
+ // validation
+ TableReference tempTable =
+ new TableReference()
+ .setProjectId(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject())
+ .setDatasetId(getQueryTempDataset())
+ .setTableId("dummy table");
+ BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ }
}
+ } catch (Exception e) {
Review comment:
Do you need this here? Can you catch more specific exceptions?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2484,17 +2492,20 @@ public void validate(PipelineOptions pipelineOptions) {
// The user specified a table.
if (getJsonTableRef() != null && getJsonTableRef().isAccessible() &&
getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService =
getBigQueryServices().getDatasetService(options);
- // Check for destination table presence and emptiness for early
failure notification.
- // Note that a presence check can fail when the table or dataset is
created by an earlier
- // stage of the pipeline. For these cases the #withoutValidation
method can be used to
- // disable the check.
- BigQueryHelpers.verifyDatasetPresence(datasetService, table);
- if (getCreateDisposition() ==
BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- BigQueryHelpers.verifyTablePresence(datasetService, table);
- }
- if (getWriteDisposition() ==
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+ try (DatasetService datasetService =
getBigQueryServices().getDatasetService(options)) {
+ // Check for destination table presence and emptiness for early
failure notification.
+ // Note that a presence check can fail when the table or dataset is
created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation
method can be used to
+ // disable the check.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() ==
BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ BigQueryHelpers.verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() ==
BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+ }
+ } catch (Exception e) {
Review comment:
Not needed?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -222,6 +221,15 @@ public PCollectionTuple expand(PCollection<KV<String,
TableRowInfo<ElementT>>> i
/** The list of unique ids for each BigQuery table row. */
private transient Map<String, List<String>> uniqueIdsForTableRows;
+ private @Nullable DatasetService datasetService;
Review comment:
nit: should this be transient?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1401,15 +1405,19 @@ void cleanup(ContextContainer c) throws Exception {
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);
- DatasetService datasetService =
getBigQueryServices().getDatasetService(options);
- LOG.info("Deleting temporary table with query results {}",
tempTable);
- datasetService.deleteTable(tempTable);
- // Delete dataset only if it was created by Beam
- boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
- if (datasetCreatedByBeam) {
- LOG.info(
- "Deleting temporary dataset with query results {}",
tempTable.getDatasetId());
- datasetService.deleteDataset(tempTable.getProjectId(),
tempTable.getDatasetId());
+ try (DatasetService datasetService =
+ getBigQueryServices().getDatasetService(options)) {
+ LOG.info("Deleting temporary table with query results {}",
tempTable);
+ datasetService.deleteTable(tempTable);
+ // Delete dataset only if it was created by Beam
+ boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
+ if (datasetCreatedByBeam) {
+ LOG.info(
+ "Deleting temporary dataset with query results {}",
tempTable.getDatasetId());
+ datasetService.deleteDataset(tempTable.getProjectId(),
tempTable.getDatasetId());
+ }
+ } catch (Exception e) {
Review comment:
It doesn't look like this is needed?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -325,13 +345,22 @@ public PCollectionTuple expand(PCollection<KV<String,
TableRowInfo<ElementT>>> i
// shuffling.
private class InsertBatchedElements
extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>,
Void> {
+ private @Nullable DatasetService datasetService;
Review comment:
nit: should this be transient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]