Repository: beam Updated Branches: refs/heads/master 4daac6644 -> 6bf42622c
[BEAM-1347] Update protos related to State API for prototyping purposes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25d77b12 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25d77b12 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25d77b12 Branch: refs/heads/master Commit: 25d77b1240b366542c9094b2ab6373f520278cd0 Parents: 4daac66 Author: Luke Cwik <[email protected]> Authored: Wed Apr 12 15:28:35 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Fri Apr 14 09:10:08 2017 -0700 ---------------------------------------------------------------------- sdks/common/fn-api/pom.xml | 5 - .../fn-api/src/main/proto/beam_fn_api.proto | 174 +++++++++---------- 2 files changed, 82 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/25d77b12/sdks/common/fn-api/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml index e9253c2..e3a583b 100644 --- a/sdks/common/fn-api/pom.xml +++ b/sdks/common/fn-api/pom.xml @@ -82,11 +82,6 @@ <dependencies> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/25d77b12/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto index 80bae2e..79e1872 100644 --- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -199,7 +199,7 @@ message RemoteGrpcPort { * with the addition of new types of instructions/responses related to metrics. */ -// An API that describes the work that a SDK Fn Harness is meant to do. +// An API that describes the work that a SDK harness is meant to do. // Stable service BeamFnControl { // Instructions sent by the runner to the SDK requesting different types @@ -282,7 +282,13 @@ message ProcessBundleDescriptor { // A request to process a given bundle. // Stable message ProcessBundleRequest { + // (Required) A reference to the process bundle descriptor that must be + // instantiated and executed by the SDK harness. string process_bundle_descriptor_reference = 1; + + // (Optional) A list of cache tokens that can be used by an SDK to cache + // data looked up using the State API across multiple bundles. + repeated CacheToken cache_tokens = 2; } // Stable @@ -465,16 +471,12 @@ service BeamFnData { /* * State API - * - * This is just a high level sketch of how this could work. There is still - * a lot of work with respect to how the key spaces for the different types - * of access required (side inputs, user state, ...) and how state caching - * works across bundles. */ message StateRequest { // (Required) An unique identifier provided by the SDK which represents this - // requests execution. The StateResponse must have the matching id. + // requests execution. The StateResponse corresponding with this request + // will have the matching id. string id = 1; // (Required) The associated instruction id of the work that is currently @@ -482,17 +484,20 @@ message StateRequest { // to state to be committed with the appropriate work execution. string instruction_reference = 2; - // At least one of the following fields should be populated. - // Also, no request should use a state key referred to in another state key. + // (Required) The state key this request is for. + StateKey state_key = 3; - // (Optional) A request to get state. - repeated StateGetRequest get = 3; + // (Required) The action to take on this request. + oneof request { + // A request to get state. + StateGetRequest get = 1000; - // (Optional) A request to append to state. - repeated StateAppendRequest append = 4; + // A request to append to state. + StateAppendRequest append = 1001; - // (Optional) A request to clear state. - repeated StateClearRequest clear = 5; + // A request to clear state. + StateClearRequest clear = 1002; + } } message StateResponse { @@ -501,31 +506,22 @@ message StateResponse { // to the SDK. string id = 1; - // (Required) The associated instruction id of the work that is currently - // being processed. - string instruction_reference = 2; - - // (Required) A key to associate with the version of this state. Allows for - // SDKs to share state across work items if they have the same cache key and - // state key. - bytes cache_key = 3; - // (Optional) If this is specified, then the state request has failed. // A human readable string representing the reason as to why the request // failed. - string error = 4; - - // For every field populated in the StateRequest, there is a matching field in - // the StateResponse. + string error = 2; - // (Optional) A response to getting state. - repeated StateGetResponse get = 5; + // A corresponding response matching the request will be populated. + oneof response { + // A response to getting state. + StateGetResponse get = 1000; - // (Optional) A response to appending to state. - repeated StateAppendResponse append = 6; + // A response to appending to state. + StateAppendResponse append = 1001; - // (Optional) A response to clearing state. - repeated StateClearResponse clear = 7; + // A response to clearing state. + StateClearResponse clear = 1002; + } } service BeamFnState { @@ -540,91 +536,84 @@ service BeamFnState { ) {} } +message CacheToken { + // (Required) Represents the function spec and tag associated with this state + // key. + // + // By combining the function_spec_reference with the tag representing: + // * the input, we refer to the iterable portion of a large GBK + // * the side input, we refer to the side input + // * the user state, we refer to user state + Target target = 1; -// TODO: Resolve with the other State API. -service SimpleBeamFnState { - // Gets the elements associated with the given key. - rpc Get(StateKey) returns (Elements.Data) {} - // Appends elements to a given state bag. - rpc Append(SimpleStateAppendRequest) returns (Empty) {} - // Clears a given state bag. - rpc Clear(StateKey) returns (Empty) {} -} - -message Empty { -} - -message SimpleStateAppendRequest { - StateKey state_key = 1; - repeated bytes data = 2; + // (Required) An opaque identifier. + bytes token = 2; } message StateKey { - // (Required) Represents the namespace for the state. If this state is for a - // DoFn, then this reference is expected to point to the DoFn. If this state - // is for a side input, then this is expected to reference the ViewFn. - string function_spec_reference = 1; + // (Required) Represents the function spec and tag associated with this state + // key. + // + // By combining the function_spec_reference with the tag representing: + // * the input, we refer to fetching the iterable portion of a large GBK + // * the side input, we refer to fetching the side input + // * the user state, we refer to fetching user state + Target target = 1; // (Required) The bytes of the window which this state request is for encoded - // in the outer context. + // in the nested context. bytes window = 2; - // (Required) The user key for which the value was encoded in the outer - // context. + // (Required) The user key encoded in the nested context. bytes key = 3; } -message StateKeyOrIterable { - // One of the two fields below are required to be set. - // If state key is set, then the State API should be invoked to fetch the - // values allowing one to restart the iterable. Otherwise the bytes for the - // entire iterable are represented and should be decoded using an iterable - // coder using the outer context. - StateKey state_key = 1; - repeated bytes iterable = 2; +// A logical byte stream which can be continued using the state API. +message ContinuableStream { + // (Optional) If specified, represents a token which can be used with the + // state API to get the next chunk of this logical byte stream. The end of + // the logical byte stream is signalled by this field being unset. + bytes continuation_token = 1; + + // Represents a part of a logical byte stream. Elements within + // the logical byte stream are encoded in the nested context and + // concatenated together. + bytes data = 2; } -// A request to get state for the given state key. +// A request to get state. message StateGetRequest { - // A state key encoded in the outer context. - StateKey state_key = 1; + // (Optional) If specified, signals to the runner that the response + // should resume from the following continuation token. + // + // If unspecified, signals to the runner that the response should start + // from the beginning of the logical continuable stream. + bytes continuation_token = 1; } -// A response to get state for the given state key. +// A response to get state. message StateGetResponse { - // A state key encoded in the outer context. - StateKey state_key = 1; - - oneof state { - // A description of an input port which will stream the state data. - RemoteGrpcPort remote_grpc_port = 1000; - } + // (Required) The response containing a continuable logical byte stream. + ContinuableStream stream = 1; } -// A request to append state for the given state key. +// A request to append state. message StateAppendRequest { - // A state key encoded in the outer context. - StateKey state_key = 1; + // Represents a part of a logical byte stream. Elements within + // the logical byte stream are encoded in the nested context and + // multiple append requests are concatenated together. + bytes data = 1; } -// A response to append state for the given state key. +// A response to append state. message StateAppendResponse { - // A state key encoded in the outer context. - StateKey state_key = 1; - - oneof state { - // A description of an output port which to stream the state data to. - RemoteGrpcPort remote_grpc_port = 1000; - } } -// A request to clear state for the given state key. +// A request to clear state. message StateClearRequest { - // A state key encoded in the outer context. - StateKey state_key = 1; } -// A response to clear state for the given state key. +// A response to clear state. message StateClearResponse { } @@ -753,7 +742,7 @@ message OAuth2ClientCredentialsGrant { string url = 1; } -// A Docker container configuration for launching the SDK Fn Harness to execute +// A Docker container configuration for launching the SDK harness to execute // user specified functions. message DockerContainer { // (Required) A pipeline level unique id which can be used as a reference to @@ -769,3 +758,4 @@ message DockerContainer { // requiring additional configuration by a runner. string registry_reference = 3; } +
