[ 
https://issues.apache.org/jira/browse/BEAM-3774?focusedWorklogId=87767&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87767
 ]

ASF GitHub Bot logged work on BEAM-3774:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Apr/18 21:20
            Start Date: 04/Apr/18 21:20
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #5001: [BEAM-3774] Adds 
support for reading from/writing to more BQ geographical locations
URL: https://github.com/apache/beam/pull/5001
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 96a06229713..29b405bf368 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
@@ -203,12 +204,28 @@ static void verifyDatasetPresence(DatasetService 
datasetService, TableReference
       } else {
         throw new RuntimeException(
             String.format(
-                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", 
toTableSpec(table)),
-            e);
+                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", 
toTableSpec(table)), e);
       }
     }
   }
 
+  static String getDatasetLocation(
+      DatasetService datasetService, String projectId, String datasetId) {
+    Dataset dataset;
+    try {
+      dataset = datasetService.getDataset(projectId, datasetId);
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(
+          String.format(
+              "unable to obtain dataset for dataset %s in project %s", 
datasetId, projectId),
+          e);
+    }
+    return dataset.getLocation();
+  }
+
   static void verifyTablePresence(DatasetService datasetService, 
TableReference table) {
     try {
       datasetService.getTable(table);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 88de9b4e505..fab238cb788 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -170,6 +170,13 @@
  *     .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
  * }</pre>
  *
+ * <p>Users can optionally specify a query priority using {@link 
TypedRead#withQueryPriority(
+ * TypedRead.QueryPriority)} and a geographic location where the query will be 
executed using {@link
+ * TypedRead#withQueryLocation(String)}. Query location must be specified for 
jobs that are not
+ * executed in US or EU. See <a
+ * 
href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query";>BigQuery
 Jobs:
+ * query</a>.
+ *
  * <h3>Writing</h3>
  *
  * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} 
transformation. This consumes a
@@ -549,6 +556,7 @@ public Read withTemplateCompatibility() {
       abstract Builder<T> setWithTemplateCompatibility(Boolean 
useTemplateCompatibility);
       abstract Builder<T> setBigQueryServices(BigQueryServices 
bigQueryServices);
       abstract Builder<T> setQueryPriority(QueryPriority priority);
+      abstract Builder<T> setQueryLocation(String location);
       abstract TypedRead<T> build();
 
       abstract Builder<T> setParseFn(
@@ -570,6 +578,8 @@ public Read withTemplateCompatibility() {
 
     @Nullable abstract QueryPriority getQueryPriority();
 
+    @Nullable abstract String getQueryLocation();
+
     @Nullable abstract Coder<T> getCoder();
 
     /**
@@ -632,7 +642,8 @@ public Read withTemplateCompatibility() {
                 getBigQueryServices(),
                 coder,
                 getParseFn(),
-                MoreObjects.firstNonNull(getQueryPriority(), 
QueryPriority.BATCH));
+                MoreObjects.firstNonNull(getQueryPriority(), 
QueryPriority.BATCH),
+                getQueryLocation());
       }
       return source;
     }
@@ -687,7 +698,8 @@ public void validate(PipelineOptions options) {
                 new JobConfigurationQuery()
                     .setQuery(getQuery().get())
                     .setFlattenResults(getFlattenResults())
-                    .setUseLegacySql(getUseLegacySql()));
+                    .setUseLegacySql(getUseLegacySql()),
+                getQueryLocation());
           } catch (Exception e) {
             throw new IllegalArgumentException(
                 String.format(QUERY_VALIDATION_FAILURE_ERROR, 
getQuery().get()), e);
@@ -939,6 +951,18 @@ public TableReference getTable() {
       return toBuilder().setQueryPriority(priority).build();
     }
 
+    /**
+     * BigQuery geographic location where the query <a
+     * 
href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs";>job</a> 
will be
+     * executed. If not specified, Beam tries to determine the location by 
examining the tables
+     * referenced by the query. Location must be specified for queries not 
executed in US or EU. See
+     * <a 
href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query";>BigQuery
 Jobs:
+     * query</a>.
+     */
+    public TypedRead<T> withQueryLocation(String location) {
+      return toBuilder().setQueryLocation(location).build();
+    }
+
     @Experimental(Experimental.Kind.SOURCE_SINK)
     public TypedRead<T> withTemplateCompatibility() {
       return toBuilder().setWithTemplateCompatibility(true).build();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 34d7c68f8fa..f380b7d391b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -59,9 +59,18 @@
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority) {
+      QueryPriority priority,
+      String location) {
     return new BigQueryQuerySource<>(
-        stepUuid, query, flattenResults, useLegacySql, bqServices, coder, 
parseFn, priority);
+        stepUuid,
+        query,
+        flattenResults,
+        useLegacySql,
+        bqServices,
+        coder,
+        parseFn,
+        priority,
+        location);
   }
 
   private final ValueProvider<String> query;
@@ -69,6 +78,7 @@
   private final Boolean useLegacySql;
   private transient AtomicReference<JobStatistics> dryRunJobStats;
   private final QueryPriority priority;
+  private final String location;
 
   private BigQueryQuerySource(
       String stepUuid,
@@ -78,13 +88,15 @@ private BigQueryQuerySource(
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority) {
+      QueryPriority priority,
+      String location) {
     super(stepUuid, bqServices, coder, parseFn);
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
     this.dryRunJobStats = new AtomicReference<>();
     this.priority = priority;
+    this.location = location;
   }
 
   @Override
@@ -97,13 +109,17 @@ public long getEstimatedSizeBytes(PipelineOptions options) 
throws Exception {
   protected TableReference getTableToExtract(BigQueryOptions bqOptions)
       throws IOException, InterruptedException {
     // 1. Find the location of the query.
-    String location = null;
-    List<TableReference> referencedTables =
-        dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+    String location = this.location;
     DatasetService tableService = bqServices.getDatasetService(bqOptions);
-    if (referencedTables != null && !referencedTables.isEmpty()) {
-      TableReference queryTable = referencedTables.get(0);
-      location = tableService.getTable(queryTable).getLocation();
+    if (location == null) {
+      // If location was not provided we try to determine it from the tables 
referenced by the
+      // Query. This will only work for BQ locations US and EU.
+      List<TableReference> referencedTables =
+          dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+      if (referencedTables != null && !referencedTables.isEmpty()) {
+        TableReference queryTable = referencedTables.get(0);
+        location = tableService.getTable(queryTable).getLocation();
+      }
     }
 
     String jobIdToken = createJobIdToken(bqOptions.getJobName(), stepUuid);
@@ -125,7 +141,8 @@ protected TableReference getTableToExtract(BigQueryOptions 
bqOptions)
 
     // 3. Execute the query.
     executeQuery(
-        jobIdToken, bqOptions.getProject(), tableToExtract, 
bqServices.getJobService(bqOptions));
+        jobIdToken, bqOptions.getProject(), tableToExtract, 
bqServices.getJobService(bqOptions),
+        location);
 
     return tableToExtract;
   }
@@ -151,8 +168,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions 
bqOptions)
       throws InterruptedException, IOException {
     if (dryRunJobStats.get() == null) {
-      JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
-          bqOptions.getProject(), createBasicQueryConfig());
+      JobStatistics jobStats =
+          bqServices
+              .getJobService(bqOptions)
+              .dryRunQuery(bqOptions.getProject(), createBasicQueryConfig(), 
this.location);
       dryRunJobStats.compareAndSet(null, jobStats);
     }
     return dryRunJobStats.get();
@@ -162,7 +181,8 @@ private void executeQuery(
       String jobIdToken,
       String executingProject,
       TableReference destinationTable,
-      JobService jobService) throws IOException, InterruptedException {
+      JobService jobService,
+      String bqLocation) throws IOException, InterruptedException {
     // Generate a transient (random) query job ID, because this code may be 
retried after the
     // temporary dataset and table have already been deleted by a previous 
attempt -
     // in that case we want to re-generate the temporary dataset and table, 
and we'll need
@@ -174,9 +194,11 @@ private void executeQuery(
         destinationTable,
         queryJobId);
 
-    JobReference jobRef = new JobReference()
-        .setProjectId(executingProject)
-        .setJobId(queryJobId);
+    JobReference jobRef =
+        new JobReference()
+            .setProjectId(executingProject)
+            .setLocation(bqLocation)
+            .setJobId(queryJobId);
 
     JobConfigurationQuery queryConfig = createBasicQueryConfig()
         .setAllowLargeResults(true)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dde005df074..1295cc0fe2c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -85,7 +85,7 @@ Job pollJob(JobReference jobRef, int maxAttempts)
     /**
      * Dry runs the query in the given project.
      */
-    JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig)
+    JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig, String location)
         throws InterruptedException, IOException;
 
     /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6c766888bb4..9771733bbd5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -254,7 +254,9 @@ Job pollJob(
         BackOff backoff) throws InterruptedException {
       do {
         try {
-          Job job = client.jobs().get(jobRef.getProjectId(), 
jobRef.getJobId()).execute();
+          Job job = client.jobs().get(
+                  jobRef.getProjectId(), jobRef.getJobId()).setLocation(
+                          jobRef.getLocation()).execute();
           JobStatus status = job.getStatus();
           if (status != null && status.getState() != null && 
status.getState().equals("DONE")) {
             LOG.info("BigQuery job {} completed in state DONE", jobRef);
@@ -281,9 +283,11 @@ private static String formatBqStatusCommand(String 
projectId, String jobId) {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig)
+    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig,
+                                     String location)
         throws InterruptedException, IOException {
-      Job job = new Job()
+      JobReference jobRef = new 
JobReference().setLocation(location).setProjectId(projectId);
+      Job job = new Job().setJobReference(jobRef)
           .setConfiguration(new JobConfiguration()
               .setQuery(queryConfig)
               .setDryRun(true));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a15afed8f0b..6d82c569265 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -101,7 +101,8 @@ public ExtractResult(TableSchema schema, List<ResourceId> 
extractedFiles) {
   protected ExtractResult extractFiles(PipelineOptions options) throws 
Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     TableReference tableToExtract = getTableToExtract(bqOptions);
-    Table table = 
bqServices.getDatasetService(bqOptions).getTable(tableToExtract);
+    BigQueryServices.DatasetService datasetService = 
bqServices.getDatasetService(bqOptions);
+    Table table = datasetService.getTable(tableToExtract);
     if (table == null) {
       throw new IOException(String.format(
               "Cannot start an export job since table %s does not exist",
@@ -113,13 +114,17 @@ protected ExtractResult extractFiles(PipelineOptions 
options) throws Exception {
     String extractJobId = 
getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
     final String extractDestinationDir =
         resolveTempLocation(bqOptions.getTempLocation(), 
"BigQueryExtractTemp", stepUuid);
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, tableToExtract.getProjectId(), 
tableToExtract.getDatasetId());
     List<ResourceId> tempFiles =
         executeExtract(
             extractJobId,
             tableToExtract,
             jobService,
             bqOptions.getProject(),
-            extractDestinationDir);
+            extractDestinationDir,
+            bqLocation);
     return new ExtractResult(schema, tempFiles);
   }
 
@@ -160,11 +165,11 @@ public void validate() {
 
   private List<ResourceId> executeExtract(
       String jobId, TableReference table, JobService jobService, String 
executingProject,
-      String extractDestinationDir)
+      String extractDestinationDir, String bqLocation)
           throws InterruptedException, IOException {
-    JobReference jobRef = new JobReference()
-        .setProjectId(executingProject)
-        .setJobId(jobId);
+
+    JobReference jobRef =
+        new 
JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId);
 
     String destinationUri = 
BigQueryIO.getExtractDestinationUri(extractDestinationDir);
     JobConfigurationExtract extract = new JobConfigurationExtract()
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 339003d4575..cd128a1dfe5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -228,8 +228,6 @@ public WriteTables(
     return writeTablesOutputs.get(mainOutputTag);
   }
 
-
-
   private void load(
       JobService jobService,
       DatasetService datasetService,
@@ -255,13 +253,20 @@ private void load(
     }
     String projectId = ref.getProjectId();
     Job lastFailedLoadJob = null;
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
     for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
       String jobId = jobIdPrefix + "-" + i;
-      JobReference jobRef = new 
JobReference().setProjectId(projectId).setJobId(jobId);
+
+      JobReference jobRef =
+          new 
JobReference().setProjectId(projectId).setJobId(jobId).setLocation(bqLocation);
+
       LOG.info("Loading {} files into {} using job {}, attempt {}", 
gcsUris.size(), ref, jobRef, i);
       jobService.startLoadJob(jobRef, loadConfig);
       LOG.info("Load job {} started", jobRef);
+
       Job loadJob = jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+
       Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
       switch (jobStatus) {
         case SUCCEEDED:
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 6b4cf534a01..b6fbe4905b4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -614,7 +614,8 @@ public void testBigQueryQuerySourceInitSplit() throws 
Exception {
         fakeBqServices,
         TableRowJsonCoder.of(),
         BigQueryIO.TableRowParser.INSTANCE,
-        QueryPriority.BATCH);
+        QueryPriority.BATCH,
+        null);
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
     TableReference queryTable = new TableReference()
@@ -693,7 +694,8 @@ public void testBigQueryNoTableQuerySourceInitSplit() 
throws Exception {
         fakeBqServices,
         TableRowJsonCoder.of(),
         BigQueryIO.TableRowParser.INSTANCE,
-        QueryPriority.BATCH);
+        QueryPriority.BATCH,
+        null);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index e1edd83f5c9..ac715a32be7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -228,7 +228,7 @@ public void expectDryRunQuery(String projectId, String 
query, JobStatistics resu
   }
 
   @Override
-  public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
query)
+  public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
query, String location)
       throws InterruptedException, IOException {
     synchronized (dryRunQueryResults) {
       JobStatistics result = dryRunQueryResults.get(projectId, 
query.getQuery());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 87767)
    Time Spent: 1h 50m  (was: 1h 40m)

> Update BigQuery jobs to explicitly specify the region
> -----------------------------------------------------
>
>                 Key: BEAM-3774
>                 URL: https://issues.apache.org/jira/browse/BEAM-3774
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Chamikara Jayalath
>            Assignee: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This is needed to support BQ regions other than US and EU. Region can be 
> obtained by a Dataset.get() request so no need to update the user API.
> Both Python and Java SDKs have to be updated.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to