This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5e15717 Pass pipeline options from caller to expansion service
(#11574)
5e15717 is described below
commit 5e1571760b61b8ce247d5375b71c8df4d69d6409
Author: Brian Hulette <[email protected]>
AuthorDate: Tue May 5 13:39:52 2020 -0700
Pass pipeline options from caller to expansion service (#11574)
---
model/job-management/src/main/proto/beam_expansion_api.proto | 4 ++++
.../apache/beam/sdk/expansion/service/ExpansionService.java | 6 +++++-
sdks/python/apache_beam/runners/job/utils.py | 11 +++++++++++
.../python/apache_beam/runners/portability/portable_runner.py | 9 +--------
sdks/python/apache_beam/transforms/external.py | 9 ++++++++-
5 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto
b/model/job-management/src/main/proto/beam_expansion_api.proto
index e358c56..c2c9a72 100644
--- a/model/job-management/src/main/proto/beam_expansion_api.proto
+++ b/model/job-management/src/main/proto/beam_expansion_api.proto
@@ -30,6 +30,7 @@ option java_package = "org.apache.beam.model.expansion.v1";
option java_outer_classname = "ExpansionApi";
import "beam_runner_api.proto";
+import "google/protobuf/struct.proto";
message ExpansionRequest {
// Set of components needed to interpret the transform, or which
@@ -46,6 +47,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // The pipeline options specified by the caller
+ google.protobuf.Struct pipeline_options = 4;
}
message ExpansionResponse {
diff --git
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index e19754d..0e88313 100644
---
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -38,6 +38,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
@@ -313,8 +314,11 @@ public class ExpansionService extends
ExpansionServiceGrpc.ExpansionServiceImplB
request.getTransform().getUniqueName(),
request.getTransform().getSpec().getUrn());
LOG.debug("Full transform: {}", request.getTransform());
+
+ Pipeline pipeline =
+
Pipeline.create(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));
+
Set<String> existingTransformIds =
request.getComponents().getTransformsMap().keySet();
- Pipeline pipeline = Pipeline.create();
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
RehydratedComponents rehydratedComponents =
diff --git a/sdks/python/apache_beam/runners/job/utils.py
b/sdks/python/apache_beam/runners/job/utils.py
index 1a90adb..672ab4c 100644
--- a/sdks/python/apache_beam/runners/job/utils.py
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -28,6 +28,17 @@ from google.protobuf import json_format
from google.protobuf import struct_pb2
+def pipeline_options_dict_to_struct(options):
+ # type: (dict) -> struct_pb2.Struct
+ # TODO: Define URNs for options.
+ # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+ return dict_to_struct({
+ 'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
+ for k,
+ v in options.items() if v is not None
+ })
+
+
def dict_to_struct(dict_obj):
# type: (dict) -> struct_pb2.Struct
return json_format.ParseDict(dict_obj, struct_pb2.Struct())
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 180a41e..b416352 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -164,14 +164,7 @@ class JobServiceHandle(object):
all_options = self.options.get_all_options(
add_extra_args_fn=add_runner_options,
retain_unknown_options=self._retain_unknown_options)
- # TODO: Define URNs for options.
- # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
- p_options = {
- 'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
- for k,
- v in all_options.items() if v is not None
- }
- return job_utils.dict_to_struct(p_options)
+ return job_utils.pipeline_options_dict_to_struct(all_options)
def prepare(self, proto_pipeline):
# type: (beam_runner_api_pb2.Pipeline) ->
beam_job_api_pb2.PrepareJobResponse
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index 4f31cbb..cfb5e67 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -37,6 +37,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
from apache_beam.portability.api.external_transforms_pb2 import
ExternalConfigurationPayload
from apache_beam.runners import pipeline_context
+from apache_beam.runners.job import utils as job_utils
from apache_beam.transforms import ptransform
from apache_beam.typehints.native_type_compatibility import
convert_to_beam_type
from apache_beam.typehints.trivial_inference import instance_to_type
@@ -309,10 +310,16 @@ class ExternalTransform(ptransform.PTransform):
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
components = context.to_runner_api()
+
+ # Retain unknown options since they may only be relevant to the expanding
+ # SDK
+ options = pipeline._options.get_all_options(
+ drop_default=True, retain_unknown_options=True)
request = beam_expansion_api_pb2.ExpansionRequest(
components=components,
namespace=self._namespace, # type: ignore # mypy thinks
self._namespace is threading.local
- transform=transform_proto)
+ transform=transform_proto,
+ pipeline_options=job_utils.pipeline_options_dict_to_struct(options))
if isinstance(self._expansion_service, str):
# Some environments may not support unsecure channels. Hence using a