[ 
https://issues.apache.org/jira/browse/BEAM-12356?focusedWorklogId=758213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758213
 ]

ASF GitHub Bot logged work on BEAM-12356:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/22 02:28
            Start Date: 19/Apr/22 02:28
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on code in PR #17382:
URL: https://github.com/apache/beam/pull/17382#discussion_r852542834


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java:
##########
@@ -96,105 +96,112 @@ public static TableReference executeQuery(
       throws InterruptedException, IOException {
     // Step 1: Find the effective location of the query.
     String effectiveLocation = location;
-    DatasetService tableService = bqServices.getDatasetService(options);
-    if (effectiveLocation == null) {
-      List<TableReference> referencedTables =
-          dryRunQueryIfNeeded(
-                  bqServices,
-                  options,
-                  dryRunJobStats,
-                  query,
-                  flattenResults,
-                  useLegacySql,
-                  location)
-              .getQuery()
-              .getReferencedTables();
-      if (referencedTables != null && !referencedTables.isEmpty()) {
-        TableReference referencedTable = referencedTables.get(0);
-        effectiveLocation =
-            tableService
-                .getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
-                .getLocation();
+    try (DatasetService tableService = bqServices.getDatasetService(options)) {
+      if (effectiveLocation == null) {
+        List<TableReference> referencedTables =
+            dryRunQueryIfNeeded(
+                    bqServices,
+                    options,
+                    dryRunJobStats,
+                    query,
+                    flattenResults,
+                    useLegacySql,
+                    location)
+                .getQuery()
+                .getReferencedTables();
+        if (referencedTables != null && !referencedTables.isEmpty()) {
+          TableReference referencedTable = referencedTables.get(0);
+          effectiveLocation =
+              tableService
+                  .getDataset(referencedTable.getProjectId(), 
referencedTable.getDatasetId())
+                  .getLocation();
+        }
       }
-    }
 
-    // Step 2: Create a temporary dataset in the query location only if the 
user has not specified a
-    // temp dataset.
-    String queryJobId =
-        BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), 
stepUuid, JobType.QUERY);
-    Optional<String> queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
-    TableReference queryResultTable =
-        createTempTableReference(
-            options.getBigQueryProject() == null
-                ? options.getProject()
-                : options.getBigQueryProject(),
-            queryJobId,
-            queryTempDatasetOpt);
-
-    boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
-    // Create dataset only if it has not been set by the user
-    if (beamToCreateTempDataset) {
-      LOG.info("Creating temporary dataset {} for query results", 
queryResultTable.getDatasetId());
-
-      tableService.createDataset(
-          queryResultTable.getProjectId(),
-          queryResultTable.getDatasetId(),
-          effectiveLocation,
-          "Temporary tables for query results of job " + options.getJobName(),
-          TimeUnit.DAYS.toMillis(1));
-    } else { // If the user specified a temp dataset, check that the 
destination table does not
-      // exist
-      Table destTable = tableService.getTable(queryResultTable);
-      checkArgument(
-          destTable == null,
-          "Refusing to write on existing table {} in the specified temp 
dataset {}",
-          queryResultTable.getTableId(),
-          queryResultTable.getDatasetId());
-    }
+      // Step 2: Create a temporary dataset in the query location only if the 
user has not specified
+      // a temp dataset.
+      String queryJobId =
+          BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), 
stepUuid, JobType.QUERY);
+      Optional<String> queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
+      TableReference queryResultTable =
+          createTempTableReference(
+              options.getBigQueryProject() == null
+                  ? options.getProject()
+                  : options.getBigQueryProject(),
+              queryJobId,
+              queryTempDatasetOpt);
+
+      boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+      // Create dataset only if it has not been set by the user
+      if (beamToCreateTempDataset) {
+        LOG.info(
+            "Creating temporary dataset {} for query results", 
queryResultTable.getDatasetId());
+
+        tableService.createDataset(
+            queryResultTable.getProjectId(),
+            queryResultTable.getDatasetId(),
+            effectiveLocation,
+            "Temporary tables for query results of job " + 
options.getJobName(),
+            TimeUnit.DAYS.toMillis(1));
+      } else { // If the user specified a temp dataset, check that the 
destination table does not
+        // exist
+        Table destTable = tableService.getTable(queryResultTable);
+        checkArgument(
+            destTable == null,
+            "Refusing to write on existing table {} in the specified temp 
dataset {}",
+            queryResultTable.getTableId(),
+            queryResultTable.getDatasetId());
+      }
 
-    // Step 3: Execute the query. Generate a transient (random) query job ID, 
because this code may
-    // be retried after the temporary dataset and table have been deleted by a 
previous attempt --
-    // in that case, we want to regenerate the temporary dataset and table, 
and we'll need a fresh
-    // query ID to do that.
-    LOG.info(
-        "Exporting query results into temporary table {} using job {}",
-        queryResultTable,
-        queryJobId);
-
-    JobReference jobReference =
-        new JobReference()
-            .setProjectId(
-                options.getBigQueryProject() == null
-                    ? options.getProject()
-                    : options.getBigQueryProject())
-            .setLocation(effectiveLocation)
-            .setJobId(queryJobId);
-
-    JobConfigurationQuery queryConfiguration =
-        createBasicQueryConfig(query, flattenResults, useLegacySql)
-            .setAllowLargeResults(true)
-            .setDestinationTable(queryResultTable)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .setWriteDisposition("WRITE_TRUNCATE")
-            .setPriority(priority.name());
-
-    if (kmsKey != null) {
-      queryConfiguration.setDestinationEncryptionConfiguration(
-          new EncryptionConfiguration().setKmsKeyName(kmsKey));
-    }
+      // Step 3: Execute the query. Generate a transient (random) query job 
ID, because this code
+      // may be retried after the temporary dataset and table have been 
deleted by a previous
+      // attempt 

Issue Time Tracking
-------------------

    Worklog Id:     (was: 758213)
    Time Spent: 7h 20m  (was: 7h 10m)

> BigQueryWriteClient in DatasetServiceImpl is not closed, which causes 
> "ManagedChannel allocation site" exceptions
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12356
>                 URL: https://issues.apache.org/jira/browse/BEAM-12356
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.29.0, 2.32.0, 2.33.0
>            Reporter: Minbo Bae
>            Assignee: Reuven Lax
>            Priority: P2
>             Fix For: 2.34.0
>
>         Attachments: bigquery_grpc.log
>
>          Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> [BigQueryWriteClient|https://github.com/apache/beam/blob/v2.29.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L461]
>  in DatasetServiceImpl (added at [https://github.com/apache/beam/pull/14309)] 
> is not closed.  This causes the error logs  in gRPC orphan channel clean up. 
> See "bigquery_grpc.log" in attachments which is extracted from GCP Dataflow. 
> I don't think this issue affect pipeline runs except the error logs, but 
> could you take a look at that?
> A similar issue is reported for {{CloudBigtableIO}} at 
> [https://github.com/googleapis/java-bigtable-hbase/issues/2658]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to