Luke Cwik created BEAM-3787:
-------------------------------
Summary: Migrate Fn API to be bidirectional instruction/request
stream
Key: BEAM-3787
URL: https://issues.apache.org/jira/browse/BEAM-3787
Project: Beam
Issue Type: Improvement
Components: beam-model
Reporter: Luke Cwik
Assignee: Robert Bradshaw
Allow the SDK to request the Runner to do something on its behalf. This
mechanism can be used for:
* Reporting final counters
* Work shedding (SDK can choose to reduce the amount of work it wants to do
(checkpointing))
* Requesting process bundle descriptors (instead of requiring the Runner to
send them and have the SDK cache them).
* Decoupling the message type in control allows for new types of messages to be
added which are not one to one.
Example API change below (note that SdkMessage/RunnerMessage should use a
different name):
// An API that describes control messages between the SDK and Runner to process
// bundles, split bundles, report progress, ...
service BeamFnControl {
//
rpc Control(
// A stream of SDK requests/responses.
stream SdkMessage
) returns (
// A stream of Runner requests/responses.
stream RunnerMessage
) {}
}
// Messages a Runner can send over the control plane.
message RunnerMessage {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The RunnerInstructionResponse MUST have the
matching id.
string id = 1;
oneof message {
ErrorResponse error = 999;
RegisterRequest register = 1000;
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
ShedBundleResponse shed_bundle = 1000;
}
}
// Messages an SDK can send over the control plane.
message SdkMessage {
oneof message {
RunnerInstructionResponse runner_instruction_response = 1000;
SdkInstructionRequest sdk_instruction_request = 1001;
}
}
// A request sent by a runner which the SDK is asked to fulfill.
// For any unsupported request type, an error should be returned with a
// matching instruction id.
// Stable
message RunnerInstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// this requests execution. The RunnerInstructionResponse MUST have the
matching id.
string instruction_id = 1;
// (Required) A request that the SDK Harness needs to interpret.
oneof request {
RegisterRequest register = 1000;
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
}
}
// The response for an associated request the SDK had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
// (Required) A reference provided by the runner which represents a requests
// execution. The RunnerInstructionResponse MUST have the matching id when
// responding to the runner.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
RegisterResponse register = 1000;
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
}
}
message SdkInstructionRequest {
// (Required) An unique identifier provided by the SDK which represents
// this requests execution. The SdkInstructionResponse MUST have the matching
id.
string instruction_id = 1;
// (Required) A request that the Runner needs to interpret.
oneof request {
ShedBundleRequest shed_bundle = 1000;
}
}
// The response for an associated request the Runner had been asked to fulfill.
// Stable
message RunnerInstructionResponse {
// (Required) A reference provided by the SDK which represents a requests
// execution. The RunnerInstructionResponse MUST have the matching id when
// responding to the SDK.
string instruction_id = 1;
// If this is specified, then this instruction has failed.
// A human readable string representing the reason as to why processing has
// failed.
string error = 2;
// If the instruction did not fail, it is required to return an equivalent
// response type depending on the request this matches.
oneof response {
ShedBundleResponse shed_bundle = 1000;
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)