[ 
https://issues.apache.org/jira/browse/BEAM-12356?focusedWorklogId=648163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-648163
 ]

ASF GitHub Bot logged work on BEAM-12356:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Sep/21 19:07
            Start Date: 08/Sep/21 19:07
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #15480:
URL: https://github.com/apache/beam/pull/15480#discussion_r704697764



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -965,49 +965,53 @@ public void validate(PipelineOptions options) {
       // earlier stages of the pipeline or if a query depends on earlier 
stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable 
the check.
       if (getValidate()) {
-        if (table != null) {
-          checkArgument(table.isAccessible(), "Cannot call validate if table 
is dynamically set.");
-        }
-        if (table != null && table.get().getProjectId() != null) {
-          // Check for source table presence for early failure notification.
-          DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-          BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
-          BigQueryHelpers.verifyTablePresence(datasetService, table.get());
-        } else if (getQuery() != null) {
-          checkArgument(
-              getQuery().isAccessible(), "Cannot call validate if query is 
dynamically set.");
-          JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
-          try {
-            jobService.dryRunQuery(
-                bqOptions.getBigQueryProject() == null
-                    ? bqOptions.getProject()
-                    : bqOptions.getBigQueryProject(),
-                new JobConfigurationQuery()
-                    .setQuery(getQuery().get())
-                    .setFlattenResults(getFlattenResults())
-                    .setUseLegacySql(getUseLegacySql()),
-                getQueryLocation());
-          } catch (Exception e) {
-            throw new IllegalArgumentException(
-                String.format(QUERY_VALIDATION_FAILURE_ERROR, 
getQuery().get()), e);
+        try (DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions)) {
+          if (table != null) {
+            checkArgument(
+                table.isAccessible(), "Cannot call validate if table is 
dynamically set.");
           }
+          if (table != null && table.get().getProjectId() != null) {
+            // Check for source table presence for early failure notification.
+            BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
+            BigQueryHelpers.verifyTablePresence(datasetService, table.get());
+          } else if (getQuery() != null) {
+            checkArgument(
+                getQuery().isAccessible(), "Cannot call validate if query is 
dynamically set.");
+            JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
+            try {
+              jobService.dryRunQuery(
+                  bqOptions.getBigQueryProject() == null
+                      ? bqOptions.getProject()
+                      : bqOptions.getBigQueryProject(),
+                  new JobConfigurationQuery()
+                      .setQuery(getQuery().get())
+                      .setFlattenResults(getFlattenResults())
+                      .setUseLegacySql(getUseLegacySql()),
+                  getQueryLocation());
+            } catch (Exception e) {
+              throw new IllegalArgumentException(
+                  String.format(QUERY_VALIDATION_FAILURE_ERROR, 
getQuery().get()), e);
+            }
 
-          DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-          // If the user provided a temp dataset, check if the dataset exists 
before launching the
-          // query
-          if (getQueryTempDataset() != null) {
-            // The temp table is only used for dataset and project id 
validation, not for table name
-            // validation
-            TableReference tempTable =
-                new TableReference()
-                    .setProjectId(
-                        bqOptions.getBigQueryProject() == null
-                            ? bqOptions.getProject()
-                            : bqOptions.getBigQueryProject())
-                    .setDatasetId(getQueryTempDataset())
-                    .setTableId("dummy table");
-            BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            // If the user provided a temp dataset, check if the dataset 
exists before launching the
+            // query
+            if (getQueryTempDataset() != null) {
+              // The temp table is only used for dataset and project id 
validation, not for table
+              // name
+              // validation
+              TableReference tempTable =
+                  new TableReference()
+                      .setProjectId(
+                          bqOptions.getBigQueryProject() == null
+                              ? bqOptions.getProject()
+                              : bqOptions.getBigQueryProject())
+                      .setDatasetId(getQueryTempDataset())
+                      .setTableId("dummy table");
+              BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
+            }
           }
+        } catch (Exception e) {

Review comment:
       close() throws Exception, so this is the best I can do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 648163)
    Time Spent: 3h  (was: 2h 50m)

> BigQueryWriteClient in DatasetServiceImpl is not closed, which causes 
> "ManagedChannel allocation site" exceptions
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12356
>                 URL: https://issues.apache.org/jira/browse/BEAM-12356
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.29.0
>            Reporter: Minbo Bae
>            Assignee: Reuven Lax
>            Priority: P2
>             Fix For: 2.31.0
>
>         Attachments: bigquery_grpc.log
>
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> [BigQueryWriteClient|https://github.com/apache/beam/blob/v2.29.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L461]
>  in DatasetServiceImpl (added at [https://github.com/apache/beam/pull/14309)] 
> is not closed.  This causes the error logs  in gRPC orphan channel clean up. 
> See "bigquery_grpc.log" in attachments which is extracted from GCP Dataflow. 
> I don't think this issue affect pipeline runs except the error logs, but 
> could you take a look at that?
> A similar issue is reported for {{CloudBigtableIO}} at 
> [https://github.com/googleapis/java-bigtable-hbase/issues/2658]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to