[ 
https://issues.apache.org/jira/browse/BEAM-3741?focusedWorklogId=80005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80005
 ]

ASF GitHub Bot logged work on BEAM-3741:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Mar/18 18:33
            Start Date: 13/Mar/18 18:33
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4743: [BEAM-3741] Proto 
changes for splitting over Fn API
URL: https://github.com/apache/beam/pull/4743
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 28c75595024..89c73a7798f 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -41,6 +41,7 @@ option java_outer_classname = "BeamFnApi";
 import "beam_runner_api.proto";
 import "endpoints.proto";
 import "google/protobuf/timestamp.proto";
+import "google/protobuf/wrappers.proto";
 
 /*
  * Constructs that define the pipeline shape.
@@ -182,6 +183,48 @@ message ProcessBundleDescriptor {
   org.apache.beam.model.pipeline.v1.ApiServiceDescriptor 
state_api_service_descriptor = 7;
 }
 
+// Represents a partition of the bundle into two bundles: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+//   to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+//   work under residual roots.
+// - The current bundle, if no further splits happen, will have done exactly
+//   the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
+message BundleSplit {
+  // One of the root applications specifying the scope of work for a bundle.
+  message Application {
+    // (Required) The primitive transform to which to pass the element
+    string ptransform_id = 1;
+
+    // (Required) Name of the transform's input to which to pass the element.
+    string input_id = 2;
+
+    // (Required) The encoded element to pass to the transform.
+    bytes element = 3;
+
+    // Approximate lower bounds on timestamps of elements that this PTransform
+    // will produce into each of its output PCollections, when invoked on this
+    // element. Keyed by the transform's local output name.
+    map<string, int64> output_watermarks = 4;
+
+    // Approximate fraction of all work of the current bundle (before split)
+    // represented by invoking this Application and its downstream 
applications.
+    // The sum of fraction_of_work between all primary_roots and residual_roots
+    // must add up to approximately 1.0.
+    google.protobuf.DoubleValue fraction_of_work = 5;
+  }
+
+  // Root applications that should replace the current bundle.
+  repeated Application primary_roots = 1;
+
+  // Root applications that have been removed from the current bundle and
+  // have to be executed in a separate bundle (e.g. in parallel on a different
+  // worker, or after the current bundle completes, etc.)
+  repeated Application residual_roots = 2;
+}
+
 // A request to process a given bundle.
 // Stable
 message ProcessBundleRequest {
@@ -199,6 +242,10 @@ message ProcessBundleResponse {
   // (Optional) If metrics reporting is supported by the SDK, this represents
   // the final metrics to record for this bundle.
   Metrics metrics = 1;
+
+  // (Optional) Specifies that the bundle has been split since the last
+  // ProcessBundleProgressResponse was sent.
+  BundleSplit split = 2;
 }
 
 // A request to report progress information for a given bundle.
@@ -330,6 +377,10 @@ message Metrics {
 message ProcessBundleProgressResponse {
   // (Required)
   Metrics metrics = 1;
+
+  // (Optional) Specifies that the bundle has been split since the last
+  // ProcessBundleProgressResponse was sent.
+  BundleSplit split = 2;
 }
 
 message ProcessBundleSplitRequest {
@@ -337,106 +388,22 @@ message ProcessBundleSplitRequest {
   // instruction id.
   string instruction_reference = 1;
 
-  // (Required) The fraction of work (when compared to the known amount of 
work)
-  // the process bundle request should try to split at.
-  double fraction = 2;
-}
-
-// urn:org.apache.beam:restriction:element-count:1.0
-message ElementCountRestriction {
-  // A restriction representing the number of elements that should be 
processed.
-  // Effectively the range [0, count]
-  int64 count = 1;
-}
-
-// urn:org.apache.beam:restriction:element-count-skip:1.0
-message ElementCountSkipRestriction {
-  // A restriction representing the number of elements that should be skipped.
-  // Effectively the range (count, infinity]
-  int64 count = 1;
-}
-
-// Each primitive transform that is splittable is defined by a restriction
-// it is currently processing. During splitting, that currently active
-// restriction (R_initial) is split into 2 components:
-//   * a restriction (R_done) representing all elements that will be fully
-//     processed
-//   * a restriction (R_todo) representing all elements that will not be fully
-//     processed
-//
-// where:
-//   R_initial = R_done ⋃ R_todo
-message PrimitiveTransformSplit {
-  // (Required) A reference to a primitive transform with the given id that
-  // is part of the active process bundle request with the given instruction
-  // id.
-  string primitive_transform_reference = 1;
-
-  // (Required) A function specification describing the restriction
-  // that has been completed by the primitive transform.
+  // Specifies that the runner would like the bundle to split itself using
+  // BundleSplit, and give up some of the work that the bundle hasn't started
+  // doing yet, so that it can be done in a separate bundle (perhaps in
+  // parallel with the current bundle).
   //
-  // For example, a remote GRPC source will have a specific urn and data
-  // block containing an ElementCountRestriction.
-  org.apache.beam.model.pipeline.v1.FunctionSpec completed_restriction = 2;
-
-  // (Required) A function specification describing the restriction
-  // representing the remainder of work for the primitive transform.
+  // The value is the fraction of unstarted work to keep. E.g. 0 means give up
+  // as much as possible of unstarted work (e.g. checkpoint), 0.5 means give
+  // up about half of the unstarted work, etc.
+  // This is a hint and the value is approximate.
   //
-  // FOr example, a remote GRPC source will have a specific urn and data
-  // block contain an ElemntCountSkipRestriction.
-  org.apache.beam.model.pipeline.v1.FunctionSpec remaining_restriction = 3;
+  // The value is relative to the current scope of work of the bundle.
+  google.protobuf.DoubleValue fraction_of_remainder = 2;
 }
 
 message ProcessBundleSplitResponse {
-  // (Optional) A set of split responses for a currently active work item.
-  //
-  // If primitive transform B is a descendant of primitive transform A and both
-  // A and B report a split. Then B's restriction is reported as an element
-  // restriction pair and thus the fully reported restriction is:
-  //   R = A_done
-  //     ⋃ (A_boundary ⋂ B_done)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ A_todo
-  // If there is a decendant of B named C, then C would similarly report a
-  // set of element pair restrictions.
-  //
-  // This restriction is processed and completed by the currently active 
process
-  // bundle request:
-  //   A_done ⋃ (A_boundary ⋂ B_done)
-  // and these restrictions will be processed by future process bundle 
requests:
-  //   A_boundary ⋂ B_todo (passed to SDF B directly)
-  //   A_todo (passed to SDF A directly)
-
-  // If primitive transform B and C are siblings and descendants of A and A, B,
-  // and C report a split. Then B and C's restrictions are relative to A's.
-  //   R = A_done
-  //     ⋃ (A_boundary ⋂ B_done)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ (A_boundary ⋂ B_todo)
-  //     ⋃ (A_boundary ⋂ C_todo)
-  //     ⋃ A_todo
-  // If there is no descendant of B or C also reporting a split, than
-  //   B_boundary = ∅ and C_boundary = ∅
-  //
-  // This restriction is processed and completed by the currently active 
process
-  // bundle request:
-  //   A_done ⋃ (A_boundary ⋂ B_done)
-  //          ⋃ (A_boundary ⋂ C_done)
-  // and these restrictions will be processed by future process bundle 
requests:
-  //   A_boundary ⋂ B_todo (passed to SDF B directly)
-  //   A_boundary ⋂ C_todo (passed to SDF C directly)
-  //   A_todo (passed to SDF A directly)
-  //
-  // Note that descendants splits should only be reported if it is inexpensive
-  // to compute the boundary restriction intersected with descendants splits.
-  // Also note, that the boundary restriction may represent a set of elements
-  // produced by a parent primitive transform which can not be split at each
-  // element or that there are intermediate unsplittable primitive transforms
-  // between an ancestor splittable function and a descendant splittable
-  // function which may have more than one output per element. Finally note
-  // that the descendant splits should only be reported if the split
-  // information is relatively compact.
-  repeated PrimitiveTransformSplit splits = 1;
+  // Empty.
 }
 
 /*


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 80005)
    Time Spent: 4h 20m  (was: 4h 10m)

> Proto changes for splitting over Fn API
> ---------------------------------------
>
>                 Key: BEAM-3741
>                 URL: https://issues.apache.org/jira/browse/BEAM-3741
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-model
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>            Priority: Major
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to