This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 840faea2262 Create option to specify temp query project, and wire into 
source tab… (#31128)
840faea2262 is described below

commit 840faea2262f754e44e1cec0d75954c4bc44da43
Author: johnjcasey <[email protected]>
AuthorDate: Thu May 16 12:41:32 2024 -0400

    Create option to specify temp query project, and wire into source tab… 
(#31128)
    
    * Create option to specify temp query project, and wire into source table 
references
    
    * add default values to test cases
    
    * checkstyle
    
    * fix translation test
    
    * add test case
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 39 +++++++++++++++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 10 +++++-
 .../sdk/io/gcp/bigquery/BigQueryQueryHelper.java   | 15 +++++----
 .../io/gcp/bigquery/BigQueryQuerySourceDef.java    | 26 ++++++++++++---
 .../gcp/bigquery/BigQueryStorageQuerySource.java   |  8 +++++
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 26 ++++++++++++++-
 .../gcp/bigquery/BigQueryIOStorageQueryTest.java   |  6 ++++
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java |  1 +
 8 files changed, 111 insertions(+), 20 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index fce8f1c5d40..1238271c791 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -969,6 +969,8 @@ public class BigQueryIO {
 
       abstract Builder<T> setQueryTempDataset(String queryTempDataset);
 
+      abstract Builder<T> setQueryTempProject(String queryTempProject);
+
       abstract Builder<T> setMethod(TypedRead.Method method);
 
       abstract Builder<T> setFormat(DataFormat method);
@@ -1029,6 +1031,8 @@ public class BigQueryIO {
 
     abstract @Nullable String getQueryTempDataset();
 
+    abstract @Nullable String getQueryTempProject();
+
     public abstract TypedRead.Method getMethod();
 
     abstract DataFormat getFormat();
@@ -1109,6 +1113,7 @@ public class BigQueryIO {
                 MoreObjects.firstNonNull(getQueryPriority(), 
QueryPriority.BATCH),
                 getQueryLocation(),
                 getQueryTempDataset(),
+                getQueryTempProject(),
                 getKmsKey());
       }
       return sourceDef;
@@ -1124,6 +1129,7 @@ public class BigQueryIO {
           MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
           getQueryLocation(),
           getQueryTempDataset(),
+          getQueryTempProject(),
           getKmsKey(),
           getFormat(),
           getParseFn(),
@@ -1203,12 +1209,16 @@ public class BigQueryIO {
               // The temp table is only used for dataset and project id 
validation, not for table
               // name
               // validation
+              String project = getQueryTempProject();
+              if (project == null) {
+                project =
+                    bqOptions.getBigQueryProject() == null
+                        ? bqOptions.getProject()
+                        : bqOptions.getBigQueryProject();
+              }
               TableReference tempTable =
                   new TableReference()
-                      .setProjectId(
-                          bqOptions.getBigQueryProject() == null
-                              ? bqOptions.getProject()
-                              : bqOptions.getBigQueryProject())
+                      .setProjectId(project)
                       .setDatasetId(getQueryTempDataset())
                       .setTableId("dummy table");
               BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
@@ -1602,12 +1612,16 @@ public class BigQueryIO {
               String jobUuid = c.getJobId();
 
               Optional<String> queryTempDataset = 
Optional.ofNullable(getQueryTempDataset());
-
+              String project = getQueryTempProject();
+              if (project == null) {
+                project =
+                    options.getBigQueryProject() == null
+                        ? options.getProject()
+                        : options.getBigQueryProject();
+              }
               TableReference tempTable =
                   createTempTableReference(
-                      options.getBigQueryProject() == null
-                          ? options.getProject()
-                          : options.getBigQueryProject(),
+                      project,
                       BigQueryResourceNaming.createJobIdPrefix(
                           options.getJobName(), jobUuid, JobType.QUERY),
                       queryTempDataset);
@@ -2032,6 +2046,15 @@ public class BigQueryIO {
       return toBuilder().setQueryTempDataset(queryTempDatasetRef).build();
     }
 
+    /** See {@link #withQueryTempDataset(String)}. */
+    public TypedRead<T> withQueryTempProjectAndDataset(
+        String queryTempProjectRef, String queryTempDatasetRef) {
+      return toBuilder()
+          .setQueryTempProject(queryTempProjectRef)
+          .setQueryTempDataset(queryTempDatasetRef)
+          .build();
+    }
+
     /** See {@link Method}. */
     public TypedRead<T> withMethod(TypedRead.Method method) {
       return toBuilder().setMethod(method).build();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index fee79f5896c..fe6d93954ee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -94,6 +94,7 @@ public class BigQueryIOTranslation {
             .addNullableByteArrayField("query_priority")
             .addNullableStringField("query_location")
             .addNullableStringField("query_temp_dataset")
+            .addNullableStringField("query_temp_project")
             .addNullableByteArrayField("method")
             .addNullableByteArrayField("format")
             .addNullableArrayField("selected_fields", FieldType.STRING)
@@ -160,6 +161,9 @@ public class BigQueryIOTranslation {
       if (transform.getQueryTempDataset() != null) {
         fieldValues.put("query_temp_dataset", transform.getQueryTempDataset());
       }
+      if (transform.getQueryTempProject() != null) {
+        fieldValues.put("query_temp_project", transform.getQueryTempProject());
+      }
       if (transform.getMethod() != null) {
         fieldValues.put("method", toByteArray(transform.getMethod()));
       }
@@ -270,6 +274,10 @@ public class BigQueryIOTranslation {
         if (queryTempDataset != null) {
           builder = builder.setQueryTempDataset(queryTempDataset);
         }
+        String queryTempProject = configRow.getString("query_temp_project");
+        if (queryTempProject != null) {
+          builder = builder.setQueryTempProject(queryTempProject);
+        }
         byte[] methodBytes = configRow.getBytes("method");
         if (methodBytes != null) {
           builder = builder.setMethod((TypedRead.Method) 
fromByteArray(methodBytes));
@@ -576,7 +584,7 @@ public class BigQueryIOTranslation {
             "deterministic_record_id_fn", 
toByteArray(transform.getDeterministicRecordIdFn()));
       }
       if (transform.getWriteTempDataset() != null) {
-        fieldValues.put("write_temp_dataset", 
toByteArray(transform.getDeterministicRecordIdFn()));
+        fieldValues.put("write_temp_dataset", 
toByteArray(transform.getWriteTempDataset()));
       }
       if (transform.getRowMutationInformationFn() != null) {
         fieldValues.put(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index be49777417e..f5334a41cdc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -92,6 +92,7 @@ class BigQueryQueryHelper {
       QueryPriority priority,
       @Nullable String location,
       @Nullable String queryTempDatasetId,
+      @Nullable String queryTempProjectId,
       @Nullable String kmsKey)
       throws InterruptedException, IOException {
     // Step 1: Find the effective location of the query.
@@ -128,13 +129,15 @@ class BigQueryQueryHelper {
           BigQueryResourceNaming.createJobIdPrefix(
               options.getJobName(), stepUuid, JobType.QUERY, 
BigQueryHelpers.randomUUIDString());
       Optional<String> queryTempDatasetOpt = 
Optional.ofNullable(queryTempDatasetId);
+      String project = queryTempProjectId;
+      if (project == null) {
+        project =
+            options.getBigQueryProject() == null
+                ? options.getProject()
+                : options.getBigQueryProject();
+      }
       TableReference queryResultTable =
-          createTempTableReference(
-              options.getBigQueryProject() == null
-                  ? options.getProject()
-                  : options.getBigQueryProject(),
-              tempTableID,
-              queryTempDatasetOpt);
+          createTempTableReference(project, tempTableID, queryTempDatasetOpt);
 
       boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
       // Create dataset only if it has not been set by the user
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 585b58aa366..b4035a4e9ac 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -47,6 +47,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
   private final BigQueryIO.TypedRead.QueryPriority priority;
   private final String location;
   private final String tempDatasetId;
+  private final String tempProjectId;
   private final String kmsKey;
 
   private transient AtomicReference<@Nullable JobStatistics> dryRunJobStats;
@@ -59,9 +60,18 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
       BigQueryIO.TypedRead.QueryPriority priority,
       String location,
       String tempDatasetId,
+      String tempProjectId,
       String kmsKey) {
     return new BigQueryQuerySourceDef(
-        bqServices, query, flattenResults, useLegacySql, priority, location, 
tempDatasetId, kmsKey);
+        bqServices,
+        query,
+        flattenResults,
+        useLegacySql,
+        priority,
+        location,
+        tempDatasetId,
+        tempProjectId,
+        kmsKey);
   }
 
   private BigQueryQuerySourceDef(
@@ -72,6 +82,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
       BigQueryIO.TypedRead.QueryPriority priority,
       String location,
       String tempDatasetId,
+      String tempProjectId,
       String kmsKey) {
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
@@ -80,6 +91,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
     this.priority = priority;
     this.location = location;
     this.tempDatasetId = tempDatasetId;
+    this.tempProjectId = tempProjectId;
     this.kmsKey = kmsKey;
     dryRunJobStats = new AtomicReference<>();
   }
@@ -121,16 +133,22 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef 
{
         priority,
         location,
         tempDatasetId,
+        tempProjectId,
         kmsKey);
   }
 
   void cleanupTempResource(BigQueryOptions bqOptions, String stepUuid) throws 
Exception {
     Optional<String> queryTempDatasetOpt = Optional.ofNullable(tempDatasetId);
+    String project = tempProjectId;
+    if (project == null) {
+      project =
+          bqOptions.getBigQueryProject() == null
+              ? bqOptions.getProject()
+              : bqOptions.getBigQueryProject();
+    }
     TableReference tableToRemove =
         createTempTableReference(
-            bqOptions.getBigQueryProject() == null
-                ? bqOptions.getProject()
-                : bqOptions.getBigQueryProject(),
+            project,
             BigQueryResourceNaming.createJobIdPrefix(
                 bqOptions.getJobName(), stepUuid, JobType.QUERY),
             queryTempDatasetOpt);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 2d87c7c1c7d..a2350ef19a7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -46,6 +46,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
       QueryPriority priority,
       @Nullable String location,
       @Nullable String queryTempDataset,
+      @Nullable String queryTempProject,
       @Nullable String kmsKey,
       @Nullable DataFormat format,
       SerializableFunction<SchemaAndRecord, T> parseFn,
@@ -59,6 +60,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
         priority,
         location,
         queryTempDataset,
+        queryTempProject,
         kmsKey,
         format,
         parseFn,
@@ -85,6 +87,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
         priority,
         location,
         null,
+        null,
         kmsKey,
         null,
         parseFn,
@@ -99,6 +102,8 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
   private final QueryPriority priority;
   private final @Nullable String location;
   private final @Nullable String queryTempDataset;
+
+  private final @Nullable String queryTempProject;
   private final @Nullable String kmsKey;
 
   private transient AtomicReference<@Nullable JobStatistics> dryRunJobStats;
@@ -111,6 +116,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
       QueryPriority priority,
       @Nullable String location,
       @Nullable String queryTempDataset,
+      @Nullable String queryTempProject,
       @Nullable String kmsKey,
       @Nullable DataFormat format,
       SerializableFunction<SchemaAndRecord, T> parseFn,
@@ -124,6 +130,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
     this.priority = checkNotNull(priority, "priority");
     this.location = location;
     this.queryTempDataset = queryTempDataset;
+    this.queryTempProject = queryTempProject;
     this.kmsKey = kmsKey;
     this.dryRunJobStats = new AtomicReference<>();
   }
@@ -170,6 +177,7 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
             priority,
             location,
             queryTempDataset,
+            queryTempProject,
             kmsKey);
     try (DatasetService datasetService = 
bqServices.getDatasetService(options)) {
       return datasetService.getTable(queryResultTable);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index bc75ba8bd9b..b290cee89e2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -301,7 +301,16 @@ public class BigQueryIOReadTest implements Serializable {
 
   private void checkTypedReadQueryObject(
       BigQueryIO.TypedRead<?> read, String query, String kmsKey, String 
tempDataset) {
-    checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset, 
true);
+    checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset, 
null, true);
+  }
+
+  private void checkTypedReadQueryObject(
+      BigQueryIO.TypedRead<?> read,
+      String query,
+      String kmsKey,
+      String tempDataset,
+      String tempProject) {
+    checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset, 
tempProject, true);
   }
 
   private void checkReadTableObjectWithValidate(
@@ -325,11 +334,13 @@ public class BigQueryIOReadTest implements Serializable {
       String query,
       String kmsKey,
       String tempDataset,
+      String tempProject,
       boolean validate) {
     assertNull(read.getTable());
     assertEquals(query, read.getQuery().get());
     assertEquals(kmsKey, read.getKmsKey());
     assertEquals(tempDataset, read.getQueryTempDataset());
+    assertEquals(tempProject, read.getQueryTempProject());
     assertEquals(validate, read.getValidate());
   }
 
@@ -396,6 +407,16 @@ public class BigQueryIOReadTest implements Serializable {
     checkTypedReadQueryObject(read, "foo_query", "kms_key", "temp_dataset");
   }
 
+  @Test
+  public void testBuildQueryBasedTypedReadSourceWithTempProject() {
+    BigQueryIO.TypedRead<?> read =
+        BigQueryIO.readTableRows()
+            .fromQuery("foo_query")
+            .withKmsKey("kms_key")
+            .withQueryTempProjectAndDataset("temp_project", "temp_dataset");
+    checkTypedReadQueryObject(read, "foo_query", "kms_key", "temp_dataset", 
"temp_project");
+  }
+
   @Test
   @ProjectOverride
   public void testValidateReadSetsBigQueryProject() throws Exception {
@@ -911,6 +932,7 @@ public class BigQueryIOReadTest implements Serializable {
                 QueryPriority.BATCH,
                 null,
                 null,
+                null,
                 null)
             .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, 
false);
 
@@ -987,6 +1009,7 @@ public class BigQueryIOReadTest implements Serializable {
                 QueryPriority.BATCH,
                 null,
                 null,
+                null,
                 null)
             .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, 
false);
 
@@ -1059,6 +1082,7 @@ public class BigQueryIOReadTest implements Serializable {
                 QueryPriority.BATCH,
                 null,
                 null,
+                null,
                 null)
             .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, 
false);
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 0c5325286dd..497653f9ab8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -307,6 +307,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.INTERACTIVE,
             /* location = */ null,
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             null,
             new TableRowParser(),
@@ -419,6 +420,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.BATCH,
             /* location = */ null,
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             null,
             new TableRowParser(),
@@ -519,6 +521,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.BATCH,
             /* location = */ null,
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             null,
             new TableRowParser(),
@@ -666,6 +669,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.BATCH,
             /* location = */ null,
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             DataFormat.AVRO,
             new TableRowParser(),
@@ -737,6 +741,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.BATCH,
             /* location = */ null,
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             null,
             new TableRowParser(),
@@ -761,6 +766,7 @@ public class BigQueryIOStorageQueryTest {
             /* priority = */ QueryPriority.INTERACTIVE,
             /* location = */ "asia-northeast1",
             /* queryTempDataset = */ null,
+            /* queryTempProject = */ null,
             /* kmsKey = */ null,
             null,
             new TableRowParser(),
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 3654304f439..950ebcaafe4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -60,6 +60,7 @@ public class BigQueryIOTranslationTest {
     READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryPriority", "query_priority");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryLocation", "query_location");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempDataset", 
"query_temp_dataset");
+    READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempProject", 
"query_temp_project");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getMethod", "method");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getFormat", "format");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getSelectedFields", "selected_fields");

Reply via email to