This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 73963a5 [BEAM-11932] Add Dataflow ServiceOptions.
new 734c0a8 Merge pull request #14159 from [BEAM-11932] Add Dataflow
ServiceOptions.
73963a5 is described below
commit 73963a52341edfed25df2328d4502b9ba40caabc
Author: Tyson Hamilton <[email protected]>
AuthorDate: Mon Mar 8 22:20:40 2021 +0000
[BEAM-11932] Add Dataflow ServiceOptions.
Introduce service options for Dataflow. The opaque option decouples
service side feature availability from the Apache Beam release cycle.
The sole purpose of these options are to allow retroactively exposing a
service side feature on previously released SDK versions.
This also includes bumping client library versions.
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 18 +++++++--------
.../dataflow/DataflowPipelineTranslator.java | 1 +
.../dataflow/options/DataflowPipelineOptions.java | 12 ++++++++++
.../dataflow/DataflowPipelineTranslatorTest.java | 27 ++++++++++++++++++++++
.../io/gcp/healthcare/HttpHealthcareApiClient.java | 13 +++++------
5 files changed, 55 insertions(+), 16 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 84cf4ac..5843660 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -423,7 +423,7 @@ class BeamModulePlugin implements Plugin<Project> {
def checkerframework_version = "3.10.0"
def classgraph_version = "4.8.65"
def errorprone_version = "2.3.4"
- def google_clients_version = "1.30.10"
+ def google_clients_version = "1.31.0"
def google_cloud_bigdataoss_version = "2.1.6"
def google_cloud_pubsublite_version = "0.7.0"
def google_code_gson_version = "2.8.6"
@@ -501,17 +501,17 @@ class BeamModulePlugin implements Plugin<Project> {
error_prone_annotations :
"com.google.errorprone:error_prone_annotations:$errorprone_version",
gax : "com.google.api:gax", //
google_cloud_platform_libraries_bom sets version
gax_grpc :
"com.google.api:gax-grpc", // google_cloud_platform_libraries_bom sets version
- google_api_client :
"com.google.api-client:google-api-client:$google_clients_version",
+ google_api_client :
"com.google.api-client:google-api-client:1.31.1", // 1.31.1 is required to run
1.31.0 of google_clients_version below.
google_api_client_jackson2 :
"com.google.api-client:google-api-client-jackson2:$google_clients_version",
google_api_client_java6 :
"com.google.api-client:google-api-client-java6:$google_clients_version",
google_api_common :
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
- google_api_services_bigquery :
"com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version",
- google_api_services_clouddebugger :
"com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version",
- google_api_services_cloudresourcemanager :
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version",
- google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version",
- google_api_services_healthcare :
"com.google.apis:google-api-services-healthcare:v1beta1-rev20200713-$google_clients_version",
- google_api_services_pubsub :
"com.google.apis:google-api-services-pubsub:v1-rev20200713-$google_clients_version",
- google_api_services_storage :
"com.google.apis:google-api-services-storage:v1-rev20200611-$google_clients_version",
+ google_api_services_bigquery :
"com.google.apis:google-api-services-bigquery:v2-rev20210219-$google_clients_version",
+ google_api_services_clouddebugger :
"com.google.apis:google-api-services-clouddebugger:v2-rev20200807-$google_clients_version",
+ google_api_services_cloudresourcemanager :
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20210222-$google_clients_version",
+ google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20210217-$google_clients_version",
+ google_api_services_healthcare :
"com.google.apis:google-api-services-healthcare:v1beta1-rev20210217-$google_clients_version",
+ google_api_services_pubsub :
"com.google.apis:google-api-services-pubsub:v1-rev20210208-$google_clients_version",
+ google_api_services_storage :
"com.google.apis:google-api-services-storage:v1-rev20210127-$google_clients_version",
google_auth_library_credentials :
"com.google.auth:google-auth-library-credentials", //
google_cloud_platform_libraries_bom sets version
google_auth_library_oauth2_http :
"com.google.auth:google-auth-library-oauth2-http", //
google_cloud_platform_libraries_bom sets version
google_cloud_bigquery :
"com.google.cloud:google-cloud-bigquery", //
google_cloud_platform_libraries_bom sets version
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index b4101f5..87c4b60 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -334,6 +334,7 @@ public class DataflowPipelineTranslator {
Environment environment = new Environment();
job.setEnvironment(environment);
+ job.getEnvironment().setServiceOptions(options.getServiceOptions());
WorkerPool workerPool = new WorkerPool();
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index cd5831a..e8c3708 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.options;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.annotations.Experimental;
@@ -109,6 +110,17 @@ public interface DataflowPipelineOptions
void setTemplateLocation(String value);
+ /**
+ * Service options are set by the user and configure the service. This
decouples service side
+ * feature availability from the Apache Beam release cycle.
+ */
+ @Description(
+ "Service options are set by the user and configure the service. This "
+ + "decouples service side feature availability from the Apache Beam
release cycle.")
+ List<String> getServiceOptions();
+
+ void setServiceOptions(List<String> options);
+
/** Run the job as a specific service account, instead of the default GCE
robot. */
@Hidden
@Experimental
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index dc8e753..3374d17 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -51,6 +51,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ArtifactInformation;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -1437,6 +1439,31 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
assertEquals(DataflowRunner.getContainerImageForJob(options),
payload.getContainerImage());
}
+ @Test
+ public void testServiceOptionsSet() throws IOException {
+ final List<String> serviceOptions =
+ Stream.of("whizz=bang", "foo=bar").collect(Collectors.toList());
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setServiceOptions(serviceOptions);
+
+ Pipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList())
+ .getJob();
+
+ assertEquals(serviceOptions, job.getEnvironment().getServiceOptions());
+ }
+
private static void assertAllStepOutputsHaveUniqueIds(Job job) throws
Exception {
List<String> outputIds = new ArrayList<>();
for (Step step : job.getSteps()) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index b2fcb18..283d011 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -35,8 +35,8 @@ import
com.google.api.services.healthcare.v1beta1.model.DicomStore;
import com.google.api.services.healthcare.v1beta1.model.Empty;
import com.google.api.services.healthcare.v1beta1.model.ExportResourcesRequest;
import com.google.api.services.healthcare.v1beta1.model.FhirStore;
-import
com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirRestGcsDestination;
-import
com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirRestGcsSource;
+import
com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirGcsDestination;
+import
com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirGcsSource;
import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
import com.google.api.services.healthcare.v1beta1.model.HttpBody;
import com.google.api.services.healthcare.v1beta1.model.ImportResourcesRequest;
@@ -491,8 +491,8 @@ public class HttpHealthcareApiClient implements
HealthcareApiClient, Serializabl
public Operation importFhirResource(
String fhirStore, String gcsSourcePath, @Nullable String
contentStructure)
throws IOException {
- GoogleCloudHealthcareV1beta1FhirRestGcsSource gcsSrc =
- new GoogleCloudHealthcareV1beta1FhirRestGcsSource();
+ GoogleCloudHealthcareV1beta1FhirGcsSource gcsSrc =
+ new GoogleCloudHealthcareV1beta1FhirGcsSource();
gcsSrc.setUri(gcsSourcePath);
ImportResourcesRequest importRequest = new ImportResourcesRequest();
@@ -509,9 +509,8 @@ public class HttpHealthcareApiClient implements
HealthcareApiClient, Serializabl
@Override
public Operation exportFhirResourceToGcs(String fhirStore, String
gcsDestinationPrefix)
throws IOException {
- GoogleCloudHealthcareV1beta1FhirRestGcsDestination gcsDst =
- new GoogleCloudHealthcareV1beta1FhirRestGcsDestination();
-
+ GoogleCloudHealthcareV1beta1FhirGcsDestination gcsDst =
+ new GoogleCloudHealthcareV1beta1FhirGcsDestination();
gcsDst.setUriPrefix(gcsDestinationPrefix);
ExportResourcesRequest exportRequest = new ExportResourcesRequest();
exportRequest.setGcsDestination(gcsDst);