[
https://issues.apache.org/jira/browse/BEAM-12356?focusedWorklogId=758213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758213
]
ASF GitHub Bot logged work on BEAM-12356:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Apr/22 02:28
Start Date: 19/Apr/22 02:28
Worklog Time Spent: 10m
Work Description: reuvenlax commented on code in PR #17382:
URL: https://github.com/apache/beam/pull/17382#discussion_r852542834
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java:
##########
@@ -96,105 +96,112 @@ public static TableReference executeQuery(
throws InterruptedException, IOException {
// Step 1: Find the effective location of the query.
String effectiveLocation = location;
- DatasetService tableService = bqServices.getDatasetService(options);
- if (effectiveLocation == null) {
- List<TableReference> referencedTables =
- dryRunQueryIfNeeded(
- bqServices,
- options,
- dryRunJobStats,
- query,
- flattenResults,
- useLegacySql,
- location)
- .getQuery()
- .getReferencedTables();
- if (referencedTables != null && !referencedTables.isEmpty()) {
- TableReference referencedTable = referencedTables.get(0);
- effectiveLocation =
- tableService
- .getDataset(referencedTable.getProjectId(),
referencedTable.getDatasetId())
- .getLocation();
+ try (DatasetService tableService = bqServices.getDatasetService(options)) {
+ if (effectiveLocation == null) {
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(
+ bqServices,
+ options,
+ dryRunJobStats,
+ query,
+ flattenResults,
+ useLegacySql,
+ location)
+ .getQuery()
+ .getReferencedTables();
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference referencedTable = referencedTables.get(0);
+ effectiveLocation =
+ tableService
+ .getDataset(referencedTable.getProjectId(),
referencedTable.getDatasetId())
+ .getLocation();
+ }
}
- }
- // Step 2: Create a temporary dataset in the query location only if the
user has not specified a
- // temp dataset.
- String queryJobId =
- BigQueryResourceNaming.createJobIdPrefix(options.getJobName(),
stepUuid, JobType.QUERY);
- Optional<String> queryTempDatasetOpt =
Optional.ofNullable(queryTempDatasetId);
- TableReference queryResultTable =
- createTempTableReference(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject(),
- queryJobId,
- queryTempDatasetOpt);
-
- boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
- // Create dataset only if it has not been set by the user
- if (beamToCreateTempDataset) {
- LOG.info("Creating temporary dataset {} for query results",
queryResultTable.getDatasetId());
-
- tableService.createDataset(
- queryResultTable.getProjectId(),
- queryResultTable.getDatasetId(),
- effectiveLocation,
- "Temporary tables for query results of job " + options.getJobName(),
- TimeUnit.DAYS.toMillis(1));
- } else { // If the user specified a temp dataset, check that the
destination table does not
- // exist
- Table destTable = tableService.getTable(queryResultTable);
- checkArgument(
- destTable == null,
- "Refusing to write on existing table {} in the specified temp
dataset {}",
- queryResultTable.getTableId(),
- queryResultTable.getDatasetId());
- }
+ // Step 2: Create a temporary dataset in the query location only if the
user has not specified
+ // a temp dataset.
+ String queryJobId =
+ BigQueryResourceNaming.createJobIdPrefix(options.getJobName(),
stepUuid, JobType.QUERY);
+ Optional<String> queryTempDatasetOpt =
Optional.ofNullable(queryTempDatasetId);
+ TableReference queryResultTable =
+ createTempTableReference(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject(),
+ queryJobId,
+ queryTempDatasetOpt);
+
+ boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+ // Create dataset only if it has not been set by the user
+ if (beamToCreateTempDataset) {
+ LOG.info(
+ "Creating temporary dataset {} for query results",
queryResultTable.getDatasetId());
+
+ tableService.createDataset(
+ queryResultTable.getProjectId(),
+ queryResultTable.getDatasetId(),
+ effectiveLocation,
+ "Temporary tables for query results of job " +
options.getJobName(),
+ TimeUnit.DAYS.toMillis(1));
+ } else { // If the user specified a temp dataset, check that the
destination table does not
+ // exist
+ Table destTable = tableService.getTable(queryResultTable);
+ checkArgument(
+ destTable == null,
+ "Refusing to write on existing table {} in the specified temp
dataset {}",
+ queryResultTable.getTableId(),
+ queryResultTable.getDatasetId());
+ }
- // Step 3: Execute the query. Generate a transient (random) query job ID,
because this code may
- // be retried after the temporary dataset and table have been deleted by a
previous attempt --
- // in that case, we want to regenerate the temporary dataset and table,
and we'll need a fresh
- // query ID to do that.
- LOG.info(
- "Exporting query results into temporary table {} using job {}",
- queryResultTable,
- queryJobId);
-
- JobReference jobReference =
- new JobReference()
- .setProjectId(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject())
- .setLocation(effectiveLocation)
- .setJobId(queryJobId);
-
- JobConfigurationQuery queryConfiguration =
- createBasicQueryConfig(query, flattenResults, useLegacySql)
- .setAllowLargeResults(true)
- .setDestinationTable(queryResultTable)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .setWriteDisposition("WRITE_TRUNCATE")
- .setPriority(priority.name());
-
- if (kmsKey != null) {
- queryConfiguration.setDestinationEncryptionConfiguration(
- new EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
+ // Step 3: Execute the query. Generate a transient (random) query job
ID, because this code
+ // may be retried after the temporary dataset and table have been
deleted by a previous
+ // attempt
Issue Time Tracking
-------------------
Worklog Id: (was: 758213)
Time Spent: 7h 20m (was: 7h 10m)
> BigQueryWriteClient in DatasetServiceImpl is not closed, which causes
> "ManagedChannel allocation site" exceptions
> -----------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12356
> URL: https://issues.apache.org/jira/browse/BEAM-12356
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.29.0, 2.32.0, 2.33.0
> Reporter: Minbo Bae
> Assignee: Reuven Lax
> Priority: P2
> Fix For: 2.34.0
>
> Attachments: bigquery_grpc.log
>
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> [BigQueryWriteClient|https://github.com/apache/beam/blob/v2.29.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L461]
> in DatasetServiceImpl (added at [https://github.com/apache/beam/pull/14309)]
> is not closed. This causes the error logs in gRPC orphan channel clean up.
> See "bigquery_grpc.log" in attachments which is extracted from GCP Dataflow.
> I don't think this issue affect pipeline runs except the error logs, but
> could you take a look at that?
> A similar issue is reported for {{CloudBigtableIO}} at
> [https://github.com/googleapis/java-bigtable-hbase/issues/2658]
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)