Repository: beam
Updated Branches:
  refs/heads/master 9f904dc00 -> 9d48bd5e8


[BEAM-1348] Remove deprecated concepts in Fn API (now replaced with Runner API 
concepts).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/521488f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/521488f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/521488f8

Branch: refs/heads/master
Commit: 521488f8239547c7e93c30e75ecb2462ff114cb8
Parents: 9f904dc
Author: Luke Cwik <lc...@google.com>
Authored: Fri Jun 30 10:21:55 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jul 10 15:53:07 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 151 +------------------
 .../harness/control/ProcessBundleHandler.java   |   4 +-
 .../fn/harness/control/RegisterHandler.java     |   2 +-
 .../fn/harness/control/RegisterHandlerTest.java |   8 +-
 .../apache_beam/runners/pipeline_context.py     |   2 +-
 .../runners/portability/fn_api_runner.py        |   2 +-
 .../apache_beam/runners/worker/sdk_worker.py    |   4 +-
 .../runners/worker/sdk_worker_test.py           |  16 +-
 8 files changed, 25 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 8162bc5..9da5afe 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -38,7 +38,6 @@ option java_package = "org.apache.beam.fn.v1";
 option java_outer_classname = "BeamFnApi";
 
 import "beam_runner_api.proto";
-import "google/protobuf/any.proto";
 import "google/protobuf/timestamp.proto";
 
 /*
@@ -67,129 +66,6 @@ message Target {
   string name = 2;
 }
 
-// (Deprecated) Information defining a PCollection
-//
-// Migrate to Runner API.
-message PCollection {
-  // (Required) A reference to a coder.
-  string coder_reference = 1 [deprecated = true];
-
-  // TODO: Windowing strategy, ...
-}
-
-// (Deprecated) A primitive transform within Apache Beam.
-//
-// Migrate to Runner API.
-message PrimitiveTransform {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A function spec that is used by this primitive
-  // transform to process data.
-  FunctionSpec function_spec = 2 [deprecated = true];
-
-  // A map of distinct input names to target definitions.
-  // For example, in CoGbk this represents the tag name associated with each
-  // distinct input name and a list of primitive transforms that are associated
-  // with the specified input.
-  map<string, Target.List> inputs = 3 [deprecated = true];
-
-  // A map from local output name to PCollection definitions. For example, in
-  // DoFn this represents the tag name associated with each distinct output.
-  map<string, PCollection> outputs = 4 [deprecated = true];
-
-  // TODO: Should we model side inputs as a special type of input for a
-  // primitive transform or should it be modeled as the relationship that
-  // the predecessor input will be a view primitive transform.
-  // A map of from side input names to side inputs.
-  map<string, SideInput> side_inputs = 5 [deprecated = true];
-
-  // The user name of this step.
-  // TODO: This should really be in display data and not at this level
-  string step_name = 6 [deprecated = true];
-}
-
-/*
- * User Definable Functions
- *
- * This is still unstable mainly due to how we model the side input.
- */
-
-// (Deprecated) Defines the common elements of user-definable functions,
-// to allow the SDK to express the information the runner needs to execute 
work.
-//
-// Migrate to Runner API.
-message FunctionSpec {
-  // (Required) A pipeline level unique id which can be used as a reference to
-  // refer to this.
-  string id = 1 [deprecated = true];
-
-  // (Required) A globally unique name representing this user definable
-  // function.
-  //
-  // User definable functions use the urn encodings registered such that 
another
-  // may implement the user definable function within another language.
-  //
-  // For example:
-  //    urn:org.apache.beam:coder:kv:1.0
-  string urn = 2 [deprecated = true];
-
-  // (Required) Reference to specification of execution environment required to
-  // invoke this function.
-  string environment_reference = 3 [deprecated = true];
-
-  // Data used to parameterize this function. Depending on the urn, this may be
-  // optional or required.
-  google.protobuf.Any data = 4 [deprecated = true];
-}
-
-// (Deprecated) Migrate to Runner API.
-message SideInput {
-  // TODO: Coder?
-
-  // For RunnerAPI.
-  Target input = 1 [deprecated = true];
-
-  // For FnAPI.
-  FunctionSpec view_fn = 2 [deprecated = true];
-}
-
-// (Deprecated) Defines how to encode values into byte streams and decode
-// values from byte streams. A coder can be parameterized by additional
-// properties which may or may not be language agnostic.
-//
-// Coders using the urn:org.apache.beam:coder namespace must have their
-// encodings registered such that another may implement the encoding within
-// another language.
-//
-// For example:
-//    urn:org.apache.beam:coder:kv:1.0
-//    urn:org.apache.beam:coder:iterable:1.0
-//
-// Migrate to Runner API.
-message Coder {
-  // TODO: This looks weird when compared to the other function specs
-  // which use URN to differentiate themselves. Should "Coder" be embedded
-  // inside the FunctionSpec data block.
-
-  // The data associated with this coder used to reconstruct it.
-  FunctionSpec function_spec = 1 [deprecated = true];
-
-  // A list of component coder references.
-  //
-  // For a key-value coder, there must be exactly two component coder 
references
-  // where the first reference represents the key coder and the second 
reference
-  // is the value coder.
-  //
-  // For an iterable coder, there must be exactly one component coder reference
-  // representing the value coder.
-  //
-  // TODO: Perhaps this is redundant with the data of the FunctionSpec
-  // for known coders?
-  repeated string component_coder_reference = 2 [deprecated = true];
-}
-
 // A descriptor for connecting to a remote port using the Beam Fn Data API.
 // Allows for communication between two environments (for example between the
 // runner and the SDK).
@@ -278,33 +154,20 @@ message ProcessBundleDescriptor {
   // refer to this.
   string id = 1;
 
-  // (Deprecated) A list of primitive transforms that should
-  // be used to construct the bundle processing graph.
-  //
-  // Migrate to Runner API definitions found within transforms field.
-  repeated PrimitiveTransform primitive_transform = 2 [deprecated = true];
-
-  // (Deprecated) The set of all coders referenced in this bundle.
-  //
-  // Migrate to Runner API defintions found within codersyyy field.
-  repeated Coder coders = 4 [deprecated = true];
-
   // (Required) A map from pipeline-scoped id to PTransform.
-  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 5;
+  map<string, org.apache.beam.runner_api.v1.PTransform> transforms = 2;
 
   // (Required) A map from pipeline-scoped id to PCollection.
-  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 6;
+  map<string, org.apache.beam.runner_api.v1.PCollection> pcollections = 3;
 
   // (Required) A map from pipeline-scoped id to WindowingStrategy.
-  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> 
windowing_strategies = 7;
+  map<string, org.apache.beam.runner_api.v1.WindowingStrategy> 
windowing_strategies = 4;
 
   // (Required) A map from pipeline-scoped id to Coder.
-  // TODO: Rename to "coders" once deprecated coders field is removed. Unique
-  // name is choosen to make it an easy search/replace
-  map<string, org.apache.beam.runner_api.v1.Coder> codersyyy = 8;
+  map<string, org.apache.beam.runner_api.v1.Coder> coders = 5;
 
   // (Required) A map from pipeline-scoped id to Environment.
-  map<string, org.apache.beam.runner_api.v1.Environment> environments = 9;
+  map<string, org.apache.beam.runner_api.v1.Environment> environments = 6;
 }
 
 // A request to process a given bundle.
@@ -385,14 +248,14 @@ message PrimitiveTransformSplit {
   //
   // For example, a remote GRPC source will have a specific urn and data
   // block containing an ElementCountRestriction.
-  FunctionSpec completed_restriction = 2;
+  org.apache.beam.runner_api.v1.FunctionSpec completed_restriction = 2;
 
   // (Required) A function specification describing the restriction
   // representing the remainder of work for the primitive transform.
   //
   // FOr example, a remote GRPC source will have a specific urn and data
   // block contain an ElemntCountSkipRestriction.
-  FunctionSpec remaining_restriction = 3;
+  org.apache.beam.runner_api.v1.FunctionSpec remaining_restriction = 3;
 }
 
 message ProcessBundleSplitResponse {

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 4c4f73d..2a9cef8 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by 
materializing
- * the set of required runners for each {@link 
org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
+ * the set of required runners for each {@link RunnerApi.FunctionSpec},
  * wiring them together based upon the {@code input} and {@code output} map 
definitions.
  *
  * <p>Finally executes the DAG based graph by starting all runners in reverse 
topological order,
@@ -166,7 +166,7 @@ public class ProcessBundleHandler {
             pTransform,
             processBundleInstructionId,
             processBundleDescriptor.getPcollectionsMap(),
-            processBundleDescriptor.getCodersyyyMap(),
+            processBundleDescriptor.getCodersMap(),
             pCollectionIdsToConsumers,
             addStartFunction,
             addFinishFunction);

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index 276a120..0e738ac 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -79,7 +79,7 @@ public class RegisterHandler {
           processBundleDescriptor.getClass());
       
computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
       for (Map.Entry<String, RunnerApi.Coder> entry
-          : processBundleDescriptor.getCodersyyyMap().entrySet()) {
+          : processBundleDescriptor.getCodersMap().entrySet()) {
         LOG.debug("Registering {} with type {}",
             entry.getKey(),
             entry.getValue().getClass());

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index b1f4410..2b275af 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -44,14 +44,14 @@ public class RegisterHandlerTest {
       .setRegister(BeamFnApi.RegisterRequest.newBuilder()
           
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder()
               .setId("1L")
-              .putCodersyyy("10L", RunnerApi.Coder.newBuilder()
+              .putCoders("10L", RunnerApi.Coder.newBuilder()
                   .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
                       
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:10L").build())
                       .build())
                   .build())
               .build())
           
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L")
-              .putCodersyyy("20L", RunnerApi.Coder.newBuilder()
+              .putCoders("20L", RunnerApi.Coder.newBuilder()
                   .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
                       
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("urn:20L").build())
                       .build())
@@ -82,10 +82,10 @@ public class RegisterHandlerTest {
     assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1),
         handler.getById("2L"));
     assertEquals(
-        
REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersyyyOrThrow("10L"),
+        
REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCodersOrThrow("10L"),
         handler.getById("10L"));
     assertEquals(
-        
REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersyyyOrThrow("20L"),
+        
REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCodersOrThrow("20L"),
         handler.getById("20L"));
     assertEquals(REGISTER_RESPONSE, responseFuture.get());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py 
b/sdks/python/apache_beam/runners/pipeline_context.py
index c2ae3f3..a40069b 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -84,7 +84,7 @@ class PipelineContext(object):
   def __init__(self, proto=None):
     if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
       proto = beam_runner_api_pb2.Components(
-          coders=dict(proto.codersyyy.items()),
+          coders=dict(proto.coders.items()),
           windowing_strategies=dict(proto.windowing_strategies.items()),
           environments=dict(proto.environments.items()))
     for name, cls in self._COMPONENT_TYPES.items():

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index c5438ad..f522864 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -261,7 +261,7 @@ class 
FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         id=self._next_uid(),
         transforms=transform_protos,
         pcollections=pcollection_protos,
-        codersyyy=dict(context_proto.coders.items()),
+        coders=dict(context_proto.coders.items()),
         windowing_strategies=dict(context_proto.windowing_strategies.items()),
         environments=dict(context_proto.environments.items()))
     return input_data, side_input_data, runner_sinks, process_bundle_descriptor

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e1ddfb7..ae86830 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -249,8 +249,6 @@ class SdkWorker(object):
   def register(self, request, unused_instruction_id=None):
     for process_bundle_descriptor in request.process_bundle_descriptor:
       self.fns[process_bundle_descriptor.id] = process_bundle_descriptor
-      for p_transform in list(process_bundle_descriptor.primitive_transform):
-        self.fns[p_transform.function_spec.id] = p_transform.function_spec
     return beam_fn_api_pb2.RegisterResponse()
 
   def create_execution_tree(self, descriptor):
@@ -355,7 +353,7 @@ class BeamTransformFactory(object):
     return creator(self, transform_id, transform_proto, parameter, consumers)
 
   def get_coder(self, coder_id):
-    coder_proto = self.descriptor.codersyyy[coder_id]
+    coder_proto = self.descriptor.coders[coder_id]
     if coder_proto.spec.spec.urn:
       return self.context.coders.get_by_id(coder_id)
     else:

http://git-wip-us.apache.org/repos/asf/beam/blob/521488f8/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index 553d5b8..dc72a5f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -28,6 +28,7 @@ from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.worker import sdk_worker
 
 
@@ -61,13 +62,12 @@ class 
BeamFnControlServicer(beam_fn_api_pb2.BeamFnControlServicer):
 class SdkWorkerTest(unittest.TestCase):
 
   def test_fn_registration(self):
-    fns = [beam_fn_api_pb2.FunctionSpec(id=str(ix)) for ix in range(4)]
-
-    process_bundle_descriptors = [beam_fn_api_pb2.ProcessBundleDescriptor(
-        id=str(100+ix),
-        primitive_transform=[
-            beam_fn_api_pb2.PrimitiveTransform(function_spec=fn)])
-                                  for ix, fn in enumerate(fns)]
+    process_bundle_descriptors = [
+        beam_fn_api_pb2.ProcessBundleDescriptor(
+            id=str(100+ix),
+            transforms={
+                str(ix): beam_runner_api_pb2.PTransform(unique_name=str(ix))})
+        for ix in range(4)]
 
     test_controller = 
BeamFnControlServicer([beam_fn_api_pb2.InstructionRequest(
         register=beam_fn_api_pb2.RegisterRequest(
@@ -83,7 +83,7 @@ class SdkWorkerTest(unittest.TestCase):
     harness.run()
     self.assertEqual(
         harness.worker.fns,
-        {item.id: item for item in fns + process_bundle_descriptors})
+        {item.id: item for item in process_bundle_descriptors})
 
 
 if __name__ == "__main__":

Reply via email to