Repository: beam Updated Branches: refs/heads/master cd6802bec -> 490ef8f09
Add proto definition for Runner API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5ce3b43 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5ce3b43 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5ce3b43 Branch: refs/heads/master Commit: a5ce3b4380377ebbcc75933f8f0f8c3faeefdcf4 Parents: 9ec22f1 Author: Kenneth Knowles <[email protected]> Authored: Tue Feb 7 15:25:32 2017 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Thu Feb 9 14:45:48 2017 -0800 ---------------------------------------------------------------------- sdks/common/pom.xml | 1 + sdks/common/runner-api/pom.xml | 91 +++ .../src/main/proto/beam_runner_api.proto | 638 +++++++++++++++++++ 3 files changed, 730 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml index 8364d9a..55db181 100644 --- a/sdks/common/pom.xml +++ b/sdks/common/pom.xml @@ -34,5 +34,6 @@ <modules> <module>fn-api</module> + <module>runner-api</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/runner-api/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/pom.xml b/sdks/common/runner-api/pom.xml new file mode 100644 index 0000000..8eaeb8e --- /dev/null +++ b/sdks/common/runner-api/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <packaging>jar</packaging> + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-parent</artifactId> + <version>0.6.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-common-runner-api</artifactId> + <name>Apache Beam :: SDKs :: Common :: Runner API</name> + <description>This artifact generates the stub bindings.</description> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + <resource> + <directory>${project.build.directory}/original_sources_to_package</directory> + </resource> + </resources> + + <plugins> + <!-- Skip the checkstyle plugin on generated code --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <!-- Skip the findbugs plugin on generated code --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/a5ce3b43/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto new file mode 100644 index 0000000..195ce01 --- /dev/null +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffers describing the Runner API, which is the runner-independent, + * SDK-independent definition of the Beam model. + */ + +syntax = "proto3"; + +package org.apache.beam.runner_api.v1; + +option java_package = "org.apache.beam.sdks.common.runner_api.v1"; +option java_outer_classname = "RunnerApi"; + +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +// A Pipeline is a hierarchical graph of PTransforms, linked +// by PCollections. +// +// This is represented by a number of by-reference maps to nodes, +// PCollections, SDK environments, UDF, etc., for +// supporting compact reuse and arbitrary graph structure. +// +// All of the keys in the maps here are arbitrary strings that are only +// required to be internally consistent within this proto message. +message Pipeline { + + // (Required) A map from pipeline-scoped id to GraphNode<PTransform, PCollection> + // + // Each node is required to contain a PTransform specification. + map<string, GraphNode> transform_nodes = 1; + + // (Required) A map from pipeline-scoped id to PCollection. + map<string, PCollection> pcollections = 2; + + // (Required) A map from pipeline-scoped id to WindowingStrategy. + map<string, WindowingStrategy> windowing_strategies = 3; + + // (Required) A map from pipeline-scoped id to Coder. + map<string, Coder> coders = 4; + + // (Required) A map from pipeline-scoped id to Environment. + map<string, Environment> environments = 5; + + // (Required) A map from pipeline-scoped id to FunctionSpec, + // a record for a particular user-defined function. + map<string, FunctionSpec> function_specs = 6; + + // (Required) Static display data for the pipeline. + DisplayData display_data = 7; +} + +// A generic node in a bipartite directed hierarchical graph. +// +// You can think of this as a GraphNode<NodeT, ConnectionT> where +// NodeT and ConnectionT are the two types of nodes in the bipartite +// graph. +// +// There is one NodeT on each GraphNode, while the ConnectionT values +// are named and ordered in the `inputs` and `outputs` fields. For +// arbitrary graph structures, they are expected to be by-reference. +// +// For the Runner API, the type is GraphNode<Transform, PCollection> +message GraphNode { + + // (Required) A URN that describes what kind graph node this is. + // + // Specifically, the URN should be enough to decipher the inputs, + // outputs, and payload. + string urn = 1; + + // (Optional) if this node is contained within a composite, a pointer to the + // parent. + string parent_id = 2; + + // (Required) A list of ordered, named inputs to this node. + // + // The URN for the graph node may clarify the type of the inputs + // (resp. outputs). For example: + // + // - in the Runner API these are PCollections + // - in the Fn API they may be Grpc ports + // + // The payload for this graph node may clarify the relationship of these + // inputs. For example: + // + // - for a Flatten transform they are merged + // - for a ParDo transform, some may be side inputs + // + // All inputs are recorded here so that the topological ordering of + // the graph is consistent whether or not the payload is understood. + // + repeated Connection inputs = 3; + + // (Required) A list of ordered, named outputs from this node. + // + // The URN or payload for the graph node may clarify the type and + // relationship of these. For example: + // + // - for a ParDo transform, these are tags on PCollections, which will be + // embedded in the DoFn. + // - in the Runner API the targets are PCollections + // - in the Fn API the targets may be Grpc ports + // + repeated Connection outputs = 4; + + // (Required) A payload fully specifying this node. + // Schema is determined by the URN. + google.protobuf.Any payload = 5; + + message Connection { + string name = 1; + google.protobuf.Any target = 2; + } +} + +// A PCollection! +message PCollection { + + // (Required) A unique name for the PCollection. + // + // Ideally, this should be stable over multiple evolutions of a pipeline + // for the purposes of logging and associating pipeline state with a node, + // etc. + // + // If it is not stable, then the runner decides what will happen. But, most + // importantly, it must always be here, even if it is autogenerated. + string unique_name = 1; + + // (Required) The id of the Coder for this PCollection. + string coder_id = 2; + + // (Required) Whether this PCollection is bounded or unbounded + IsBounded is_bounded = 3; + + // (Required) The id of the windowing strategy for this PCollection. + string windowing_strategy_id = 4; + + // (Required) Static display data for this PCollection. + DisplayData display_data = 5; +} + +// An applied PTransform! This does not contain the graph data, but only the +// fields specific to a graph node that is a Runner API transform +// between PCollections. +message PTransform { + + // (Required) A unique name for the application node. + // + // Ideally, this should be stable over multiple evolutions of a pipeline + // for the purposes of logging and associating pipeline state with a node, + // etc. + // + // If it is not stable, then the runner decides what will happen. But, most + // importantly, it must always be here, even if it is autogenerated. + string unique_name = 1; + + // (Optional) A URN and payload that, together, fully defined the semantics + // of this transform. + // + // If absent, this must be an "anonymous" composite transform. + // + // For primitive transform in the Runner API, this is required, and the + // payloads are as follows: + // + // - when the URN is "urn:beam:transforms:pardo" it is a ParDoPayload + // - when the URN is "urn:beam:transforms:read" it is a ReadPayload + // - when the URN is "urn:beam:transforms:gbk" it is a GroupByKeyPayload + // - when the URN is "urn:beam:transforms:window" it is a WindowPayload + // - when the URN is "urn:beam:transforms:flatten" it is absent + // + // For some special composite transforms, the payload is also well-defined: + // + // - when the URN is "urn:beam:transforms:combine" it is a CombinePayload + // + UrnWithParameter spec = 2; + + // (Required) Static display data for this PTransform application. + DisplayData display_data = 4; +} + +// The payload for the primitive ParDo transform. +message ParDoPayload { + + // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn. + string fn_id = 1; + + // (Required) Additional pieces of context the DoFn may require that + // are not otherwise represented in the payload. + // (may force runners to execute the ParDo differently) + repeated Parameter parameters = 2; + + // (Optional) An ordered list of side inputs, describing for each local name + // to the data to be provided and the expected access pattern. + // (the SDK may not be order-sensitive) + repeated SideInput side_inputs = 3; + + // (Optional) if the DoFn uses state, a list of the specs for cells. + repeated StateSpec state_specs = 4; + + // (Optional) if the DoFn uses timers, a list of the specs for timers. + repeated TimerSpec timer_specs = 5; +} + +// Parameters that a UDF might require. +// +// The details of how a runner sends these parameters to the SDK harness +// are the subject of the Fn API. +// +// The details ofo how an SDK harness delivers them to the UDF is entirely +// up to the SDK. (for some SDKs there may be parameters that are not +// represented here if the runner doesn't need to do anything) +// +// Here, the parameters are simply indicators to the runner that they +// need to run the function a particular way. +// +// TODO: the evolution of the Fn API will influence what needs explicit +// representation here +message Parameter { + Type type = 1; + + enum Type { + WINDOW = 0; + PIPELINE_OPTIONS = 1; + RESTRICTION_TRACKER = 2; + } +} + +message StateSpec { + // TODO: AST for state spec +} + +message TimerSpec { + // TODO: AST for timer spec +} + +enum IsBounded { + BOUNDED = 0; + UNBOUNDED = 1; +} + +// The payload for the primitive Read transform. +message ReadPayload { + + // (Required) The pipeline-scoped id of the FunctionSpec of the source for + // this Read. + string source_id = 1; + + // (Required) Whether the source is bounded or unbounded + IsBounded is_bounded = 2; + + // TODO: full audit of fields required by runners as opposed to SDK harness +} + +// The payload for the WindowInto transform. +message WindowIntoPayload { + // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn. + string fn_id = 1; +} + +// The payload for the special-but-not-primitive Combine transform. +message CombinePayload { + + // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn. + string fn_id = 1; + + // (Required) A reference to the Coder to use for accumulators of the CombineFn + string accumulator_coder_id = 2; + + // (Required) Additional pieces of context the DoFn may require that + // are not otherwise represented in the payload. + // (may force runners to execute the ParDo differently) + repeated Parameter parameters = 3; + + // (Optional) An ordered list of side inputs, describing for each local name + // to the data to be provided and the expected access pattern. + // (the SDK may not be order-sensitive) + repeated SideInput side_inputs = 4; +} + +// A coder, the binary format for serialization and deserialization of data in +// a pipeline. +message Coder { + + // (Required) A cross-language, stable, unique identifier for the (possibly + // parametric) encoding. + string urn = 1; + + // (Optional) If this coder is parametric, such as ListCoder(VarIntCoder), + // this is a list of the components. In order for encodings to be identical, + // the URN and all components must be identical, recursively. + repeated string component_coder_id = 2; + + // (Optional) The pipeline-scoped id for the FunctionSpec of an SDK-specific + // UDF implementing the encoding. + string custom_coder_fn_id = 3; +} + +// A windowing strategy describes the window function, triggering, allowed +// lateness, and accumulation mode for a PCollection. +// +// TODO: consider inlining field on PCollection +message WindowingStrategy { + + // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that + // assigns windows, merges windows, and shifts timestamps before they are + // combined according to the OutputTime. + string fn_id = 1; + + // (Required) Whether or not the window fn is merging. + // + // This knowledge is required for many optimizations. + MergeStatus merge_status = 2; + + // (Required) The coder for the windows of this PCollection. + string window_coder_id = 3; + + // (Required) The trigger to use when grouping this PCollection. + Trigger trigger = 4; + + // (Required) The accumulation mode indicates whether new panes are a full + // replacement for prior panes or whether they are deltas to be combined + // with other panes (the combine should correspond to whatever the upstream + // grouping transform is). + AccumulationMode accumulation_mode = 5; + + // (Required) The OutputTime specifies, for a grouping transform, how to + // compute the aggregate timestamp. The window_fn will first possibly shift + // it later, then the OutputTime takes the max, min, or ignores it and takes + // the end of window. + // + // This is actually only for input to grouping transforms, but since they + // may be introduced in runner-specific ways, it is carried along with the + // windowing strategy. + OutputTime output_time = 6; + + // (Required) The duration, in milliseconds, beyond the end of a window at + // which the window becomes droppable. + int64 allowed_lateness = 7; +} + +// Whether or not a PCollection's WindowFn is non-merging, merging, or +// merging-but-already-merged, in which case a subsequent GroupByKey is almost +// always going to do something the user does not want +enum MergeStatus { + // The WindowFn does not require merging. + // Examples: global window, FixedWindows, SlidingWindows + NON_MERGING = 0; + + // The WindowFn is merging and the PCollection has not had merging + // performed. + // Example: Sessions prior to a GroupByKey + NEEDS_MERGE = 1; + + // The WindowFn is merging and the PCollection has had merging occur + // already. + // Example: Sessions after a GroupByKey + ALREADY_MERGED = 2; +} + +// Whether or not subsequent outputs of aggregations should be entire +// replacement values or just the aggregation of inputs received since +// the prior output. +enum AccumulationMode { + + // The aggregation is discarded when it is output + DISCARDING = 0; + + // The aggregation is accumulated across outputs + ACCUMULATING = 1; +} + +// When a number of windowed, timestamped inputs are aggregated, the timestamp +// for the resulting output. +enum OutputTime { + // The output has the timestamp of the end of the window. + END_OF_WINDOW = 0; + + // The output has the latest timestamp of the input elements since + // the last output. + LATEST_IN_PANE = 1; + + // The output has the earliest timestamp of the input elements since + // the last output. + EARLIEST_IN_PANE = 2; +} + +// A small DSL for expressing when to emit new aggregations +// from a GroupByKey or CombinePerKey +// +// A trigger is described in terms of when it is _ready_ to permit output. +message Trigger { + + // Ready when all subtriggers are ready. + message AfterAll { + repeated Trigger subtriggers = 1; + } + + // Ready when any subtrigger is ready. + message AfterAny { + repeated Trigger subtriggers = 1; + } + + // Starting with the first subtrigger, ready when the _current_ subtrigger + // is ready. After output, advances the current trigger by one. + message AfterEach { + repeated Trigger subtriggers = 1; + } + + // Ready after the input watermark is past the end of the window. + // + // May have implicitly-repeated subtriggers for early and late firings. + // When the end of the window is reached, the trigger transitions between + // the subtriggers. + message AfterEndOfWindow { + + // (Optional) A trigger governing output prior to the end of the window. + Trigger early_firings = 1; + + // (Optional) A trigger governing output after the end of the window. + Trigger late_firings = 2; + } + + // After input arrives, ready when the specified delay has passed. + message AfterProcessingTimeDelay { + // (Required) The delay, in milliseconds. + int64 delay_millis = 1; + } + + // After input arrives, ready when the synchronized processing time + // progresses as far as the given delay. + message AfterSynchronizedProcessingTimeDelay { + // (Required) The delay, in milliseconds. + int64 delay_millis = 1; + } + + // Never ready. There will only be an ON_TIME output and a final + // output at window expiration. + message Never { } + + // Ready whenever the subtrigger is ready; resets state when the subtrigger + // completes. + message Repeat { + // (Require) Trigger that is run repeatedly. + Trigger subtrigger = 1; + } + + // Ready whenever either of its subtriggers are ready, but finishes output + // when the finally subtrigger fires. + message OrFinally { + + // (Required) Trigger governing main output; may fire repeatedly. + Trigger main = 1; + + // (Required) Trigger governing termination of output. + Trigger finally = 2; + } + + // The default trigger. Equivalent to Repeat { AfterEndOfWindow } but + // specially denoted to indicate the user did not alter the triggering. + message Default { } + + // The full disjoint union of possible triggers. + oneof trigger { + AfterAll after_all = 1; + AfterAny after_any = 2; + AfterEach after_each = 3; + AfterEndOfWindow after_end_of_widow = 4; + AfterProcessingTimeDelay after_processing_time_delay = 5; + AfterSynchronizedProcessingTimeDelay after_synchronized_processing_time_delay = 6; + Never never = 7; + Repeat repeat = 8; + OrFinally or_finally = 9; + Default default = 10; + } +} + +// A specification for how to "side input" a PCollection. +message SideInput { + + // (Required) A local name for this side input, as embedded in a serialized UDF. + string name = 1; + + // (Required) The pipeline-scoped unique id of the PCollection to be side input. + string pcollection_id = 2; + + // (Required) URN of the access pattern required by the `view_fn` to present + // the desired SDK-specific interface to a UDF. + // + // This access pattern defines the SDK harness <-> Runner Harness RPC + // interface for accessing a side input. + // + // The only access pattern intended for Beam, because of its superior + // performance possibilities, is "urn:beam:sideinput:multimap" (or some such + // URN) + string access_pattern = 3; + + // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that + // adapts a particular access_pattern to a user-facing view type. + // + // For example, View.asSingleton() may include a `view_fn` that adapts a + // specially-designed multimap to a single value per window. + string view_fn_id = 4; + + // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that + // maps a main input window to a side input window. + // + // For example, when the main input is in fixed windows of one hour, this + // can specify that the side input should be accessed according to the day + // in which that hour falls. + string window_mapping_fn_id = 5; +} + +// An environment for executing UDFs. Generally an SDK container URL, but +// there can be many for a single SDK, for example to provide dependency +// isolation. +message Environment { + + // (Required) The URL of a container + // + // TODO: reconcile with Fn API's DockerContainer structure by + // adding adequate metadata to know how to interpret the container + string url = 1; +} + +// Description of a function in a Beam pipeline. +// +// Contains one of _or both of_ a UrnWithParameter specifying the function +// and the specification for how to execute it against a particular +// SDK's harness. +message FunctionSpec { + + // (Optional) An SDK-independent specification of this function. + // If present, this must _fully_ specify the function. + // + // For example the distinguished urn "urn:beam:windowfn:FixedWindows" with + // payload `{ duration: n }` fully specifies a windowing function which can + // be implemented by the SDK constructing the pipeline, by another SDK (for + // language-to-language fusion compatibility) or by the runner directly. + UrnWithParameter spec = 1; + + // (Optional) An SDK-specific specification for how to execute this function, + // including a specification of the environment in which the function + // can be interpreted and executed. + SdkFunctionSpec sdk_fn_spec = 2; +} + +// A URN along with a parameter object whose schema is determined by the +// URN. +// +// The URN will often specify a parametric function or transform such as +// "Top" or "FixedWindows" while the payload would specify _n_ or +// _duration_, respectively. +message UrnWithParameter { + + // (Required) A URN that describes the accompanying payload. + string urn = 1; + + // (Optional) The data specifying any parameters to the URN. If + // the URN does not require any arguments, this may be omitted. + google.protobuf.Any parameter = 2; +} + +// An arbitrary payload tagged with the environment that knows how to +// interpret it as a user-defined function. +message SdkFunctionSpec { + + // (Required) Reference to the specification of the execution environment + // required to invoke this function. + string environment_id = 2; + + // (Required) The raw data of the function that the SDK knows how to + // deserialize, but need not be comprehensible to any other runner, SDK, or + // other entity. + google.protobuf.Any data = 4; +} + +// TODO: transfer javadoc here +message DisplayData { + + // (Required) The list of display data. + repeated Item items = 1; + + // A complete identifier for a DisplayData.Item + message Identifier { + + // (Required) The transform originating this display data. + string transform_id = 1; + + // (Optional) The URN indicating the type of the originating transform, + // if there is one. + string transform_urn = 2; + + string key = 3; + } + + // A single item of display data. + message Item { + // (Required) + Identifier id = 1; + + // (Required) + Type type = 2; + + // (Required) + google.protobuf.Any value = 3; + + // (Optional) + google.protobuf.Any short_value = 4; + + // (Optional) + string label = 5; + + // (Optional) + string link_url = 6; + } + + enum Type { + STRING = 0; INTEGER = 1; FLOAT = 2; BOOLEAN = 3; TIMESTAMP = 4; DURATION = 5; JAVA_CLASS = 6; + } +}
