Repository: beam Updated Branches: refs/heads/master 9f904dc00 -> 9d48bd5e8
[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API concepts). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/521488f8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/521488f8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/521488f8 Branch: refs/heads/master Commit: 521488f8239547c7e93c30e75ecb2462ff114cb8 Parents: 9f904dc Author: Luke Cwik <lc...@google.com> Authored: Fri Jun 30 10:21:55 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon Jul 10 15:53:07 2017 -0700 ---------------------------------------------------------------------- .../fn-api/src/main/proto/beam_fn_api.proto | 151 +------------------ .../harness/control/ProcessBundleHandler.java | 4 +- .../fn/harness/control/RegisterHandler.java | 2 +- .../fn/harness/control/RegisterHandlerTest.java | 8 +- .../apache_beam/runners/pipeline_context.py | 2 +- .../runners/portability/fn_api_runner.py | 2 +- .../apache_beam/runners/worker/sdk_worker.py | 4 +- .../runners/worker/sdk_worker_test.py | 16 +- 8 files changed, 25 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 8162bc5..9da5afe 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -38,7 +38,6 @@ option java_package = "org.apache.beam.fn.v1"; option java_outer_classname = "BeamFnApi"; import "beam_runner_api.proto"; -import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; /* @@ -67,129 +66,6 @@ message Target { string name = 2; } -// (Deprecated) Information defining a PCollection -// -// Migrate to Runner API. -message PCollection { - // (Required) A reference to a coder. - string coder_reference = 1 [deprecated = true]; - - // TODO: Windowing strategy, ... -} - -// (Deprecated) A primitive transform within Apache Beam. -// -// Migrate to Runner API. -message PrimitiveTransform { - // (Required) A pipeline level unique id which can be used as a reference to - // refer to this. - string id = 1 [deprecated = true]; - - // (Required) A function spec that is used by this primitive - // transform to process data. - FunctionSpec function_spec = 2 [deprecated = true]; - - // A map of distinct input names to target definitions. - // For example, in CoGbk this represents the tag name associated with each - // distinct input name and a list of primitive transforms that are associated - // with the specified input. - map<string, Target.List> inputs = 3 [deprecated = true]; - - // A map from local output name to PCollection definitions. For example, in - // DoFn this represents the tag name associated with each distinct output. - map<string, PCollection> outputs = 4 [deprecated = true]; - - // TODO: Should we model side inputs as a special type of input for a - // primitive transform or should it be modeled as the relationship that - // the predecessor input will be a view primitive transform. - // A map of from side input names to side inputs. - map<string, SideInput> side_inputs = 5 [deprecated = true]; - - // The user name of this step. - // TODO: This should really be in display data and not at this level - string step_name = 6 [deprecated = true]; -} - -/* - * User Definable Functions - * - * This is still unstable mainly due to how we model the side input. - */ - -// (Deprecated) Defines the common elements of user-definable functions, -// to allow the SDK to express the information the runner needs to execute work. -// -// Migrate to Runner API. -message FunctionSpec { - // (Required) A pipeline level unique id which can be used as a reference to - // refer to this. - string id = 1 [deprecated = true]; - - // (Required) A globally unique name representing this user definable - // function. - // - // User definable functions use the urn encodings registered such that another - // may implement the user definable function within another language. - // - // For example: - // urn:org.apache.beam:coder:kv:1.0 - string urn = 2 [deprecated = true]; - - // (Required) Reference to specification of execution environment required to - // invoke this function. - string environment_reference = 3 [deprecated = true]; - - // Data used to parameterize this function. Depending on the urn, this may be - // optional or required. - google.protobuf.Any data = 4 [deprecated = true]; -} - -// (Deprecated) Migrate to Runner API. -message SideInput { - // TODO: Coder? - - // For RunnerAPI. - Target input = 1 [deprecated = true]; - - // For FnAPI. - FunctionSpec view_fn = 2 [deprecated = true]; -} - -// (Deprecated) Defines how to encode values into byte streams and decode -// values from byte streams. A coder can be parameterized by additional -// properties which may or may not be language agnostic. -// -// Coders using the urn:org.apache.beam:coder namespace must have their -// encodings registered such that another may implement the encoding within -// another language. -// -// For example: -// urn:org.apache.beam:coder:kv:1.0 -// urn:org.apache.beam:coder:iterable:1.0 -// -// Migrate to Runner API. -message Coder { - // TODO: This looks weird when compared to the other function specs - // which use URN to differentiate themselves. Should "Coder" be embedded - // inside the FunctionSpec data block. - - // The data associated with this coder used to reconstruct it. - FunctionSpec function_spec = 1 [deprecated = true]; - - // A list of component coder references. - // - // For a key-value coder, there must be exactly two component coder references - // where the first reference represents the key coder and the second reference - // is the value coder. - // - // For an iterable coder, there must be exactly one component coder reference - // representing the value coder. - // - // TODO: Perhaps this is redundant with the data of the FunctionSpec - // for known coders? - repeated string component_coder_reference = 2 [deprecated = true]; -} - // A descriptor for connecting to a remote port using the Beam Fn Data API. // Allows for communication between two environments (for example between the // runner and the SDK). @@ -278,33 +154,20 @@ message ProcessBundleDescriptor { // refer to this. string id = 1; - // (Deprecated) A list of primitive transforms that should - // be used to construct the bundle processing graph. - // - // Migrate to Runner API definitions found within transforms field. - repeated PrimitiveTransform primitive_transform = 2 [deprecated = true]; - - // (Deprecated) The set of all coders referenced in this bundle. - // - // Migrate to Runner API defintions found within codersyyy field. - repeated Coder coders = 4 [deprecated = true]; - // (Required) A map from pipeline-scoped id to PTransform. - map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 5; + map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2; // (Required) A map from pipeline-scoped id to PCollection. - map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 6; + map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3; // (Required) A map from pipeline-scoped id to WindowingStrategy. - map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 7; + map<string, org.apache.beam.runner_api.v1.WindowingStrategy> windowing_strategies = 4; // (Required) A map from pipeline-scoped id to Coder. - // TODO: Rename to "coders" once deprecated coders field is removed. Unique - // name is choosen to make it an easy search/replace - map<string, org.apache.beam.runner_api.v1.Coder> codersyyy = 8; + map<string, org.apache.beam.runner_api.v1.Coder> coders = 5; // (Required) A map from pipeline-scoped id to Environment. - map<string, org.apache.beam.runner_api.v1.Environment> environments = 9; + map<string, org.apache.beam.runner_api.v1.Environment> environments = 6; } // A request to process a given bundle. @@ -385,14 +248,14 @@ message PrimitiveTransformSplit { // // For example, a remote GRPC source will have a specific urn and data // block containing an ElementCountRestriction. - FunctionSpec completed_restriction = 2; + org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2; // (Required) A function specification describing the restriction // representing the remainder of work for the primitive transform. // // FOr example, a remote GRPC source will have a specific urn and data // block contain an ElemntCountSkipRestriction. - FunctionSpec remaining_restriction = 3; + org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3; } message ProcessBundleSplitResponse { http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 4c4f73d..2a9cef8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; /** * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing - * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec}, + * the set of required runners for each {@link RunnerApi.FunctionSpec}, * wiring them together based upon the {@code input} and {@code output} map definitions. * * <p>Finally executes the DAG based graph by starting all runners in reverse topological order, @@ -166,7 +166,7 @@ public class ProcessBundleHandler { pTransform, processBundleInstructionId, processBundleDescriptor.getPcollectionsMap(), - processBundleDescriptor.getCodersyyyMap(), + processBundleDescriptor.getCodersMap(), pCollectionIdsToConsumers, addStartFunction, addFinishFunction); http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java index 276a120..0e738ac 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java @@ -79,7 +79,7 @@ public class RegisterHandler { processBundleDescriptor.getClass()); computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor); for (Map.Entry<String, RunnerApi.Coder> entry - : processBundleDescriptor.getCodersyyyMap().entrySet()) { + : processBundleDescriptor.getCodersMap().entrySet()) { LOG.debug("Registering {} with type {}", entry.getKey(), entry.getValue().getClass()); http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java index b1f4410..2b275af 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java @@ -44,14 +44,14 @@ public class RegisterHandlerTest { .setRegister(BeamFnApi.RegisterRequest.newBuilder() .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder() .setId("1L") - .putCodersyyy("10L", RunnerApi.Coder.newBuilder() + .putCoders("10L", RunnerApi.Coder.newBuilder() .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:10L").build()) .build()) .build()) .build()) .addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L") - .putCodersyyy("20L", RunnerApi.Coder.newBuilder() + .putCoders("20L", RunnerApi.Coder.newBuilder() .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:20L").build()) .build()) @@ -82,10 +82,10 @@ public class RegisterHandlerTest { assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1), handler.getById("2L")); assertEquals( - REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersyyyOrThrow("10L"), + REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersOrThrow("10L"), handler.getById("10L")); assertEquals( - REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersyyyOrThrow("20L"), + REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersOrThrow("20L"), handler.getById("20L")); assertEquals(REGISTER_RESPONSE, responseFuture.get()); } http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/pipeline_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py index c2ae3f3..a40069b 100644 --- a/sdks/python/apache_beam/runners/pipeline_context.py +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -84,7 +84,7 @@ class PipelineContext(object): def __init__(self, proto=None): if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor): proto = beam_runner_api_pb2.Components( - coders=dict(proto.codersyyy.items()), + coders=dict(proto.coders.items()), windowing_strategies=dict(proto.windowing_strategies.items()), environments=dict(proto.environments.items())) for name, cls in self._COMPONENT_TYPES.items(): http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/portability/fn_api_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index c5438ad..f522864 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -261,7 +261,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): id=self._next_uid(), transforms=transform_protos, pcollections=pcollection_protos, - codersyyy=dict(context_proto.coders.items()), + coders=dict(context_proto.coders.items()), windowing_strategies=dict(context_proto.windowing_strategies.items()), environments=dict(context_proto.environments.items())) return input_data, side_input_data, runner_sinks, process_bundle_descriptor http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/worker/sdk_worker.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index e1ddfb7..ae86830 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -249,8 +249,6 @@ class SdkWorker(object): def register(self, request, unused_instruction_id=None): for process_bundle_descriptor in request.process_bundle_descriptor: self.fns[process_bundle_descriptor.id] = process_bundle_descriptor - for p_transform in list(process_bundle_descriptor.primitive_transform): - self.fns[p_transform.function_spec.id] = p_transform.function_spec return beam_fn_api_pb2.RegisterResponse() def create_execution_tree(self, descriptor): @@ -355,7 +353,7 @@ class BeamTransformFactory(object): return creator(self, transform_id, transform_proto, parameter, consumers) def get_coder(self, coder_id): - coder_proto = self.descriptor.codersyyy[coder_id] + coder_proto = self.descriptor.coders[coder_id] if coder_proto.spec.spec.urn: return self.context.coders.get_by_id(coder_id) else: http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/worker/sdk_worker_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index 553d5b8..dc72a5f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.worker import sdk_worker @@ -61,13 +62,12 @@ class BeamFnControlServicer(beam_fn_api_pb2.BeamFnControlServicer): class SdkWorkerTest(unittest.TestCase): def test_fn_registration(self): - fns = [beam_fn_api_pb2.FunctionSpec(id=str(ix)) for ix in range(4)] - - process_bundle_descriptors = [beam_fn_api_pb2.ProcessBundleDescriptor( - id=str(100+ix), - primitive_transform=[ - beam_fn_api_pb2.PrimitiveTransform(function_spec=fn)]) - for ix, fn in enumerate(fns)] + process_bundle_descriptors = [ + beam_fn_api_pb2.ProcessBundleDescriptor( + id=str(100+ix), + transforms={ + str(ix): beam_runner_api_pb2.PTransform(unique_name=str(ix))}) + for ix in range(4)] test_controller = BeamFnControlServicer([beam_fn_api_pb2.InstructionRequest( register=beam_fn_api_pb2.RegisterRequest( @@ -83,7 +83,7 @@ class SdkWorkerTest(unittest.TestCase): harness.run() self.assertEqual( harness.worker.fns, - {item.id: item for item in fns + process_bundle_descriptors}) + {item.id: item for item in process_bundle_descriptors}) if __name__ == "__main__":