Repository: beam Updated Branches: refs/heads/master 79b1395c2 -> d84b06791
Inline rather than reference FunctionSpecs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d390406e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d390406e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d390406e Branch: refs/heads/master Commit: d390406e27112faed31233d7daef1f650a31cd0f Parents: 79b1395 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Feb 28 15:51:24 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Wed Mar 1 09:04:30 2017 -0800 ---------------------------------------------------------------------- .../src/main/proto/beam_runner_api.proto | 39 +++++++++----------- .../beam/sdk/util/WindowingStrategies.java | 18 ++------- 2 files changed, 20 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 58532b2..44ead56 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -47,10 +47,6 @@ message Components { // (Required) A map from pipeline-scoped id to Environment. map<string, Environment> environments = 5; - - // (Required) A map from pipeline-scoped id to FunctionSpec, - // a record for a particular user-defined function. - map<string, FunctionSpec> function_specs = 6; } // A disjoint union of all the things that may contain references @@ -207,8 +203,8 @@ message PCollection { // The payload for the primitive ParDo transform. message ParDoPayload { - // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn. - string fn_id = 1; + // (Required) The FunctionSpec of the DoFn. + FunctionSpec do_fn = 1; // (Required) Additional pieces of context the DoFn may require that // are not otherwise represented in the payload. @@ -266,9 +262,8 @@ enum IsBounded { // The payload for the primitive Read transform. message ReadPayload { - // (Required) The pipeline-scoped id of the FunctionSpec of the source for - // this Read. - string source_id = 1; + // (Required) The FunctionSpec of the source for this Read. + FunctionSpec source = 1; // (Required) Whether the source is bounded or unbounded IsBounded is_bounded = 2; @@ -279,15 +274,15 @@ message ReadPayload { // The payload for the WindowInto transform. message WindowIntoPayload { - // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn. - string fn_id = 1; + // (Required) The FunctionSpec of the WindowFn. + FunctionSpec window_fn = 1; } // The payload for the special-but-not-primitive Combine transform. message CombinePayload { - // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn. - string fn_id = 1; + // (Required) The FunctionSpec of the CombineFn. + FunctionSpec combine_fn = 1; // (Required) A reference to the Coder to use for accumulators of the CombineFn string accumulator_coder_id = 2; @@ -325,10 +320,10 @@ message Coder { // TODO: consider inlining field on PCollection message WindowingStrategy { - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // assigns windows, merges windows, and shifts timestamps before they are + // (Required) The FunctionSpec of the UDF that assigns windows, + // merges windows, and shifts timestamps before they are // combined according to the OutputTime. - string fn_id = 1; + FunctionSpec window_fn = 1; // (Required) Whether or not the window fn is merging. // @@ -584,20 +579,20 @@ message SideInput { // URN) UrnWithParameter access_pattern = 1; - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // adapts a particular access_pattern to a user-facing view type. + // (Required) The FunctionSpec of the UDF that adapts a particular + // access_pattern to a user-facing view type. // // For example, View.asSingleton() may include a `view_fn` that adapts a // specially-designed multimap to a single value per window. - string view_fn_id = 2; + FunctionSpec view_fn = 2; - // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that - // maps a main input window to a side input window. + // (Required) The FunctionSpec of the UDF that maps a main input window + // to a side input window. // // For example, when the main input is in fixed windows of one hour, this // can specify that the side input should be accessed according to the day // in which that hour falls. - string window_mapping_fn_id = 3; + FunctionSpec window_mapping_fn = 3; } // An environment for executing UDFs. Generally an SDK container URL, but http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java index 3047da1..7bc581c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java @@ -195,10 +195,6 @@ public class WindowingStrategies implements Serializable { public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy) throws IOException { - // TODO: have an inverted components to find the id for a thing already - // in the components - String windowFnId = UUID.randomUUID().toString(); - RunnerApi.MessageWithComponents windowFnWithComponents = toProto(windowingStrategy.getWindowFn()); @@ -209,16 +205,11 @@ public class WindowingStrategies implements Serializable { .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) - .setFnId(windowFnId); + .setWindowFn(windowFnWithComponents.getFunctionSpec()); return RunnerApi.MessageWithComponents.newBuilder() .setWindowingStrategy(windowingStrategyProto) - .setComponents( - windowFnWithComponents - .getComponents() - .toBuilder() - .putFunctionSpecs(windowFnId, windowFnWithComponents.getFunctionSpec())) - .build(); + .setComponents(windowFnWithComponents.getComponents()).build(); } /** @@ -246,10 +237,7 @@ public class WindowingStrategies implements Serializable { RunnerApi.WindowingStrategy proto, RunnerApi.Components components) throws InvalidProtocolBufferException { - FunctionSpec windowFnSpec = - components - .getFunctionSpecsMap() - .get(proto.getFnId()); + FunctionSpec windowFnSpec = proto.getWindowFn(); checkArgument( windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),