[
https://issues.apache.org/jira/browse/BEAM-12356?focusedWorklogId=648163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648163
]
ASF GitHub Bot logged work on BEAM-12356:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Sep/21 19:07
Start Date: 08/Sep/21 19:07
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #15480:
URL: https://github.com/apache/beam/pull/15480#discussion_r704697764
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -965,49 +965,53 @@ public void validate(PipelineOptions options) {
// earlier stages of the pipeline or if a query depends on earlier
stages of a pipeline.
// For these cases the withoutValidation method can be used to disable
the check.
if (getValidate()) {
- if (table != null) {
- checkArgument(table.isAccessible(), "Cannot call validate if table
is dynamically set.");
- }
- if (table != null && table.get().getProjectId() != null) {
- // Check for source table presence for early failure notification.
- DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions);
- BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
- BigQueryHelpers.verifyTablePresence(datasetService, table.get());
- } else if (getQuery() != null) {
- checkArgument(
- getQuery().isAccessible(), "Cannot call validate if query is
dynamically set.");
- JobService jobService =
getBigQueryServices().getJobService(bqOptions);
- try {
- jobService.dryRunQuery(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject(),
- new JobConfigurationQuery()
- .setQuery(getQuery().get())
- .setFlattenResults(getFlattenResults())
- .setUseLegacySql(getUseLegacySql()),
- getQueryLocation());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR,
getQuery().get()), e);
+ try (DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions)) {
+ if (table != null) {
+ checkArgument(
+ table.isAccessible(), "Cannot call validate if table is
dynamically set.");
}
+ if (table != null && table.get().getProjectId() != null) {
+ // Check for source table presence for early failure notification.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+ BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+ } else if (getQuery() != null) {
+ checkArgument(
+ getQuery().isAccessible(), "Cannot call validate if query is
dynamically set.");
+ JobService jobService =
getBigQueryServices().getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject(),
+ new JobConfigurationQuery()
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()),
+ getQueryLocation());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR,
getQuery().get()), e);
+ }
- DatasetService datasetService =
getBigQueryServices().getDatasetService(bqOptions);
- // If the user provided a temp dataset, check if the dataset exists
before launching the
- // query
- if (getQueryTempDataset() != null) {
- // The temp table is only used for dataset and project id
validation, not for table name
- // validation
- TableReference tempTable =
- new TableReference()
- .setProjectId(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject())
- .setDatasetId(getQueryTempDataset())
- .setTableId("dummy table");
- BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ // If the user provided a temp dataset, check if the dataset
exists before launching the
+ // query
+ if (getQueryTempDataset() != null) {
+ // The temp table is only used for dataset and project id
validation, not for table
+ // name
+ // validation
+ TableReference tempTable =
+ new TableReference()
+ .setProjectId(
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject())
+ .setDatasetId(getQueryTempDataset())
+ .setTableId("dummy table");
+ BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+ }
}
+ } catch (Exception e) {
Review comment:
close() throws Exception, so this is the best I can do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 648163)
Time Spent: 3h (was: 2h 50m)
> BigQueryWriteClient in DatasetServiceImpl is not closed, which causes
> "ManagedChannel allocation site" exceptions
> -----------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12356
> URL: https://issues.apache.org/jira/browse/BEAM-12356
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.29.0
> Reporter: Minbo Bae
> Assignee: Reuven Lax
> Priority: P2
> Fix For: 2.31.0
>
> Attachments: bigquery_grpc.log
>
> Time Spent: 3h
> Remaining Estimate: 0h
>
> [BigQueryWriteClient|https://github.com/apache/beam/blob/v2.29.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L461]
> in DatasetServiceImpl (added at [https://github.com/apache/beam/pull/14309)]
> is not closed. This causes the error logs in gRPC orphan channel clean up.
> See "bigquery_grpc.log" in attachments which is extracted from GCP Dataflow.
> I don't think this issue affect pipeline runs except the error logs, but
> could you take a look at that?
> A similar issue is reported for {{CloudBigtableIO}} at
> [https://github.com/googleapis/java-bigtable-hbase/issues/2658]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)