This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 840faea2262 Create option to specify temp query project, and wire into
source tab… (#31128)
840faea2262 is described below
commit 840faea2262f754e44e1cec0d75954c4bc44da43
Author: johnjcasey <[email protected]>
AuthorDate: Thu May 16 12:41:32 2024 -0400
Create option to specify temp query project, and wire into source tab…
(#31128)
* Create option to specify temp query project, and wire into source table
references
* add default values to test cases
* checkstyle
* fix translation test
* add test case
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 39 +++++++++++++++++-----
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 10 +++++-
.../sdk/io/gcp/bigquery/BigQueryQueryHelper.java | 15 +++++----
.../io/gcp/bigquery/BigQueryQuerySourceDef.java | 26 ++++++++++++---
.../gcp/bigquery/BigQueryStorageQuerySource.java | 8 +++++
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 26 ++++++++++++++-
.../gcp/bigquery/BigQueryIOStorageQueryTest.java | 6 ++++
.../io/gcp/bigquery/BigQueryIOTranslationTest.java | 1 +
8 files changed, 111 insertions(+), 20 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index fce8f1c5d40..1238271c791 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -969,6 +969,8 @@ public class BigQueryIO {
abstract Builder<T> setQueryTempDataset(String queryTempDataset);
+ abstract Builder<T> setQueryTempProject(String queryTempProject);
+
abstract Builder<T> setMethod(TypedRead.Method method);
abstract Builder<T> setFormat(DataFormat method);
@@ -1029,6 +1031,8 @@ public class BigQueryIO {
abstract @Nullable String getQueryTempDataset();
+ abstract @Nullable String getQueryTempProject();
+
public abstract TypedRead.Method getMethod();
abstract DataFormat getFormat();
@@ -1109,6 +1113,7 @@ public class BigQueryIO {
MoreObjects.firstNonNull(getQueryPriority(),
QueryPriority.BATCH),
getQueryLocation(),
getQueryTempDataset(),
+ getQueryTempProject(),
getKmsKey());
}
return sourceDef;
@@ -1124,6 +1129,7 @@ public class BigQueryIO {
MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
getQueryLocation(),
getQueryTempDataset(),
+ getQueryTempProject(),
getKmsKey(),
getFormat(),
getParseFn(),
@@ -1203,12 +1209,16 @@ public class BigQueryIO {
// The temp table is only used for dataset and project id
validation, not for table
// name
// validation
+ String project = getQueryTempProject();
+ if (project == null) {
+ project =
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject();
+ }
TableReference tempTable =
new TableReference()
- .setProjectId(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject())
+ .setProjectId(project)
.setDatasetId(getQueryTempDataset())
.setTableId("dummy table");
BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
@@ -1602,12 +1612,16 @@ public class BigQueryIO {
String jobUuid = c.getJobId();
Optional<String> queryTempDataset =
Optional.ofNullable(getQueryTempDataset());
-
+ String project = getQueryTempProject();
+ if (project == null) {
+ project =
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject();
+ }
TableReference tempTable =
createTempTableReference(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject(),
+ project,
BigQueryResourceNaming.createJobIdPrefix(
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);
@@ -2032,6 +2046,15 @@ public class BigQueryIO {
return toBuilder().setQueryTempDataset(queryTempDatasetRef).build();
}
+ /** See {@link #withQueryTempDataset(String)}. */
+ public TypedRead<T> withQueryTempProjectAndDataset(
+ String queryTempProjectRef, String queryTempDatasetRef) {
+ return toBuilder()
+ .setQueryTempProject(queryTempProjectRef)
+ .setQueryTempDataset(queryTempDatasetRef)
+ .build();
+ }
+
/** See {@link Method}. */
public TypedRead<T> withMethod(TypedRead.Method method) {
return toBuilder().setMethod(method).build();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index fee79f5896c..fe6d93954ee 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -94,6 +94,7 @@ public class BigQueryIOTranslation {
.addNullableByteArrayField("query_priority")
.addNullableStringField("query_location")
.addNullableStringField("query_temp_dataset")
+ .addNullableStringField("query_temp_project")
.addNullableByteArrayField("method")
.addNullableByteArrayField("format")
.addNullableArrayField("selected_fields", FieldType.STRING)
@@ -160,6 +161,9 @@ public class BigQueryIOTranslation {
if (transform.getQueryTempDataset() != null) {
fieldValues.put("query_temp_dataset", transform.getQueryTempDataset());
}
+ if (transform.getQueryTempProject() != null) {
+ fieldValues.put("query_temp_project", transform.getQueryTempProject());
+ }
if (transform.getMethod() != null) {
fieldValues.put("method", toByteArray(transform.getMethod()));
}
@@ -270,6 +274,10 @@ public class BigQueryIOTranslation {
if (queryTempDataset != null) {
builder = builder.setQueryTempDataset(queryTempDataset);
}
+ String queryTempProject = configRow.getString("query_temp_project");
+ if (queryTempProject != null) {
+ builder = builder.setQueryTempProject(queryTempProject);
+ }
byte[] methodBytes = configRow.getBytes("method");
if (methodBytes != null) {
builder = builder.setMethod((TypedRead.Method)
fromByteArray(methodBytes));
@@ -576,7 +584,7 @@ public class BigQueryIOTranslation {
"deterministic_record_id_fn",
toByteArray(transform.getDeterministicRecordIdFn()));
}
if (transform.getWriteTempDataset() != null) {
- fieldValues.put("write_temp_dataset",
toByteArray(transform.getDeterministicRecordIdFn()));
+ fieldValues.put("write_temp_dataset",
toByteArray(transform.getWriteTempDataset()));
}
if (transform.getRowMutationInformationFn() != null) {
fieldValues.put(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index be49777417e..f5334a41cdc 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -92,6 +92,7 @@ class BigQueryQueryHelper {
QueryPriority priority,
@Nullable String location,
@Nullable String queryTempDatasetId,
+ @Nullable String queryTempProjectId,
@Nullable String kmsKey)
throws InterruptedException, IOException {
// Step 1: Find the effective location of the query.
@@ -128,13 +129,15 @@ class BigQueryQueryHelper {
BigQueryResourceNaming.createJobIdPrefix(
options.getJobName(), stepUuid, JobType.QUERY,
BigQueryHelpers.randomUUIDString());
Optional<String> queryTempDatasetOpt =
Optional.ofNullable(queryTempDatasetId);
+ String project = queryTempProjectId;
+ if (project == null) {
+ project =
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject();
+ }
TableReference queryResultTable =
- createTempTableReference(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject(),
- tempTableID,
- queryTempDatasetOpt);
+ createTempTableReference(project, tempTableID, queryTempDatasetOpt);
boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
// Create dataset only if it has not been set by the user
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 585b58aa366..b4035a4e9ac 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -47,6 +47,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
private final BigQueryIO.TypedRead.QueryPriority priority;
private final String location;
private final String tempDatasetId;
+ private final String tempProjectId;
private final String kmsKey;
private transient AtomicReference<@Nullable JobStatistics> dryRunJobStats;
@@ -59,9 +60,18 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
BigQueryIO.TypedRead.QueryPriority priority,
String location,
String tempDatasetId,
+ String tempProjectId,
String kmsKey) {
return new BigQueryQuerySourceDef(
- bqServices, query, flattenResults, useLegacySql, priority, location,
tempDatasetId, kmsKey);
+ bqServices,
+ query,
+ flattenResults,
+ useLegacySql,
+ priority,
+ location,
+ tempDatasetId,
+ tempProjectId,
+ kmsKey);
}
private BigQueryQuerySourceDef(
@@ -72,6 +82,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
BigQueryIO.TypedRead.QueryPriority priority,
String location,
String tempDatasetId,
+ String tempProjectId,
String kmsKey) {
this.query = checkNotNull(query, "query");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
@@ -80,6 +91,7 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
this.priority = priority;
this.location = location;
this.tempDatasetId = tempDatasetId;
+ this.tempProjectId = tempProjectId;
this.kmsKey = kmsKey;
dryRunJobStats = new AtomicReference<>();
}
@@ -121,16 +133,22 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef
{
priority,
location,
tempDatasetId,
+ tempProjectId,
kmsKey);
}
void cleanupTempResource(BigQueryOptions bqOptions, String stepUuid) throws
Exception {
Optional<String> queryTempDatasetOpt = Optional.ofNullable(tempDatasetId);
+ String project = tempProjectId;
+ if (project == null) {
+ project =
+ bqOptions.getBigQueryProject() == null
+ ? bqOptions.getProject()
+ : bqOptions.getBigQueryProject();
+ }
TableReference tableToRemove =
createTempTableReference(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject(),
+ project,
BigQueryResourceNaming.createJobIdPrefix(
bqOptions.getJobName(), stepUuid, JobType.QUERY),
queryTempDatasetOpt);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 2d87c7c1c7d..a2350ef19a7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -46,6 +46,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
QueryPriority priority,
@Nullable String location,
@Nullable String queryTempDataset,
+ @Nullable String queryTempProject,
@Nullable String kmsKey,
@Nullable DataFormat format,
SerializableFunction<SchemaAndRecord, T> parseFn,
@@ -59,6 +60,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
priority,
location,
queryTempDataset,
+ queryTempProject,
kmsKey,
format,
parseFn,
@@ -85,6 +87,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
priority,
location,
null,
+ null,
kmsKey,
null,
parseFn,
@@ -99,6 +102,8 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
private final QueryPriority priority;
private final @Nullable String location;
private final @Nullable String queryTempDataset;
+
+ private final @Nullable String queryTempProject;
private final @Nullable String kmsKey;
private transient AtomicReference<@Nullable JobStatistics> dryRunJobStats;
@@ -111,6 +116,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
QueryPriority priority,
@Nullable String location,
@Nullable String queryTempDataset,
+ @Nullable String queryTempProject,
@Nullable String kmsKey,
@Nullable DataFormat format,
SerializableFunction<SchemaAndRecord, T> parseFn,
@@ -124,6 +130,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
this.priority = checkNotNull(priority, "priority");
this.location = location;
this.queryTempDataset = queryTempDataset;
+ this.queryTempProject = queryTempProject;
this.kmsKey = kmsKey;
this.dryRunJobStats = new AtomicReference<>();
}
@@ -170,6 +177,7 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
priority,
location,
queryTempDataset,
+ queryTempProject,
kmsKey);
try (DatasetService datasetService =
bqServices.getDatasetService(options)) {
return datasetService.getTable(queryResultTable);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index bc75ba8bd9b..b290cee89e2 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -301,7 +301,16 @@ public class BigQueryIOReadTest implements Serializable {
private void checkTypedReadQueryObject(
BigQueryIO.TypedRead<?> read, String query, String kmsKey, String
tempDataset) {
- checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset,
true);
+ checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset,
null, true);
+ }
+
+ private void checkTypedReadQueryObject(
+ BigQueryIO.TypedRead<?> read,
+ String query,
+ String kmsKey,
+ String tempDataset,
+ String tempProject) {
+ checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset,
tempProject, true);
}
private void checkReadTableObjectWithValidate(
@@ -325,11 +334,13 @@ public class BigQueryIOReadTest implements Serializable {
String query,
String kmsKey,
String tempDataset,
+ String tempProject,
boolean validate) {
assertNull(read.getTable());
assertEquals(query, read.getQuery().get());
assertEquals(kmsKey, read.getKmsKey());
assertEquals(tempDataset, read.getQueryTempDataset());
+ assertEquals(tempProject, read.getQueryTempProject());
assertEquals(validate, read.getValidate());
}
@@ -396,6 +407,16 @@ public class BigQueryIOReadTest implements Serializable {
checkTypedReadQueryObject(read, "foo_query", "kms_key", "temp_dataset");
}
+ @Test
+ public void testBuildQueryBasedTypedReadSourceWithTempProject() {
+ BigQueryIO.TypedRead<?> read =
+ BigQueryIO.readTableRows()
+ .fromQuery("foo_query")
+ .withKmsKey("kms_key")
+ .withQueryTempProjectAndDataset("temp_project", "temp_dataset");
+ checkTypedReadQueryObject(read, "foo_query", "kms_key", "temp_dataset",
"temp_project");
+ }
+
@Test
@ProjectOverride
public void testValidateReadSetsBigQueryProject() throws Exception {
@@ -911,6 +932,7 @@ public class BigQueryIOReadTest implements Serializable {
QueryPriority.BATCH,
null,
null,
+ null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn,
false);
@@ -987,6 +1009,7 @@ public class BigQueryIOReadTest implements Serializable {
QueryPriority.BATCH,
null,
null,
+ null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn,
false);
@@ -1059,6 +1082,7 @@ public class BigQueryIOReadTest implements Serializable {
QueryPriority.BATCH,
null,
null,
+ null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn,
false);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 0c5325286dd..497653f9ab8 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -307,6 +307,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.INTERACTIVE,
/* location = */ null,
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
null,
new TableRowParser(),
@@ -419,6 +420,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.BATCH,
/* location = */ null,
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
null,
new TableRowParser(),
@@ -519,6 +521,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.BATCH,
/* location = */ null,
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
null,
new TableRowParser(),
@@ -666,6 +669,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.BATCH,
/* location = */ null,
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
DataFormat.AVRO,
new TableRowParser(),
@@ -737,6 +741,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.BATCH,
/* location = */ null,
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
null,
new TableRowParser(),
@@ -761,6 +766,7 @@ public class BigQueryIOStorageQueryTest {
/* priority = */ QueryPriority.INTERACTIVE,
/* location = */ "asia-northeast1",
/* queryTempDataset = */ null,
+ /* queryTempProject = */ null,
/* kmsKey = */ null,
null,
new TableRowParser(),
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 3654304f439..950ebcaafe4 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -60,6 +60,7 @@ public class BigQueryIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryPriority", "query_priority");
READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryLocation", "query_location");
READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempDataset",
"query_temp_dataset");
+ READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempProject",
"query_temp_project");
READ_TRANSFORM_SCHEMA_MAPPING.put("getMethod", "method");
READ_TRANSFORM_SCHEMA_MAPPING.put("getFormat", "format");
READ_TRANSFORM_SCHEMA_MAPPING.put("getSelectedFields", "selected_fields");