+1 on making the resources part of a proto. Based upon what Henning linked
to, the provisioning API seems like an appropriate place to provide this
information.

Thomas, I believe the environment proto is the best place to add
information that a runner may want to know about upfront during pipeline
pipeline creation. I wouldn't stick this into PipelineOptions for the long
term.
If you have time to capture these thoughts and update the environment
proto, I would suggest going down that path. Otherwise anything short term
like PipelineOptions will do.

On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <[email protected]> wrote:

> For SDKs where the upper limit is constant and known upfront, why not
> communicate this along with the other harness resource info as part of the
> job submission?
>
> Regarding use of GRPC headers: Why not make this explicit in the proto
> instead?
>
> WRT runner dictating resource constraints: The runner actually may also
> not have that information. It would need to be supplied as part of the
> pipeline options? The cluster resource manager needs to allocate resources
> for both, the runner and the SDK harness(es).
>
> Finally, what can be done to unblock the Flink runner / Python until
> solution discussed here is in place? An extra runner option for SDK
> singleton on/off?
>
>
> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <[email protected]> wrote:
>
>> Sounds good to me.
>> GRPC Header of the control channel seems to be a good place to add upper
>> bound information.
>> Added jiras:
>> https://issues.apache.org/jira/browse/BEAM-5166
>> https://issues.apache.org/jira/browse/BEAM-5167
>>
>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <[email protected]>
>> wrote:
>>
>>> Regarding resources: the runner can currently dictate the mem/cpu/disk
>>> resources that the harness is allowed to use via the provisioning api. The
>>> SDK harness need not -- and should not -- speculate on what else might be
>>> running on the machine:
>>>
>>>
>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>>
>>> A realistic startup-time computation in the SDK harness would be
>>> something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that at
>>> most number of threads. Or just hardcode to 300. Or a user-provided value.
>>> Whatever the value is the maximum number of bundles in flight allowed at
>>> any given time and needs to be communicated to the runner via some message.
>>> Anything beyond would be rejected (but this shouldn't happen, because the
>>> runner should respect that number).
>>>
>>> A dynamic computation would use the same limits from the SDK, but take
>>> into account its own resource usage (incl. the usage by running bundles).
>>>
>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <[email protected]> wrote:
>>>
>>>> I am thinking upper bound to be more on the lines of theocratical upper
>>>> limit or any other static high value beyond which the SDK will reject
>>>> bundle verbosely. The idea is that SDK will not keep bundles in queue while
>>>> waiting on current bundles to finish. It will simply reject any additional
>>>> bundle.
>>>> Beyond this I don't have a good answer to dynamic upper bound. As SDK
>>>> does not have the complete picture of processes on the machine with which
>>>> it share resources, resources might not be a good proxy for upper bound
>>>> from the SDK point of view.
>>>>
>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Ankur, how would you expect an SDK to compute a realistic upper bound
>>>>> (upfront or during pipeline computation)?
>>>>>
>>>>> First thought that came to my mind was that the SDK would provide
>>>>> CPU/memory/... resourcing information and the runner making a judgement
>>>>> call as to whether it should ask the SDK to do more work or less but its
>>>>> not an explicit don't do more then X bundles in parallel.
>>>>>
>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Makes sense. Having exposed upper bound on concurrency with optimum
>>>>>> concurrency can give a good balance. This is good information to expose
>>>>>> while keeping the requirements from the SDK simple. SDK can publish 1 as
>>>>>> the optimum concurrency and upper bound to keep things simple.
>>>>>>
>>>>>> Runner introspection of upper bound on concurrency is important for
>>>>>> correctness while introspection of optimum concurrency is important for
>>>>>> efficiency. This separates efficiency and correctness requirements.
>>>>>>
>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>>> amount of bundles in parallel" is limited by the available resources. 
>>>>>>> For
>>>>>>> example, the Go SDK harness will accept an arbitrary amount of parallel
>>>>>>> work, but too much work will cause either excessive GC pressure with
>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a 
>>>>>>> reasonable
>>>>>>> upper bound will either have to be provided by the user or computed from
>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>>>> resources than others, so any static value will be an estimate or ignore
>>>>>>> resource limits.
>>>>>>>
>>>>>>> That said, I do not like that an "efficiency" aspect becomes a
>>>>>>> subtle requirement for correctness due to Flink internals. I fear that 
>>>>>>> road
>>>>>>> leads to trouble.
>>>>>>>
>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The later case of having a of supporting single bundle execution at
>>>>>>>> a time on SDK and runner not using this flag is exactly the reason we 
>>>>>>>> got
>>>>>>>> into the Dead Lock here.
>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1 in later
>>>>>>>> case ) and let runner decide to use it or not. But at the same time 
>>>>>>>> expect
>>>>>>>> SDK to handle infinite amount of bundles even if its not efficient.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ankur
>>>>>>>>
>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I believe in practice SDK harnesses will fall into one of two
>>>>>>>>> capabilities, can process effectively an infinite amount of bundles in
>>>>>>>>> parallel or can only process a single bundle at a time.
>>>>>>>>>
>>>>>>>>> I believe it is more difficult for a runner to handle the latter
>>>>>>>>> case well and to perform all the environment management that would 
>>>>>>>>> make
>>>>>>>>> that efficient. It may be inefficient for an SDK but I do believe it 
>>>>>>>>> should
>>>>>>>>> be able to say that I'm not great at anything more then a single 
>>>>>>>>> bundle at
>>>>>>>>> a time but utilizing this information by a runner should be optional.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> To recap the discussion it seems that we have come-up with
>>>>>>>>>> following point.
>>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>>
>>>>>>>>>>    1. Runner completely own the work assignment to SDKHarness.
>>>>>>>>>>    2. Runner should know the capabilities and capacity of
>>>>>>>>>>    SDKHarness and should assign work accordingly.
>>>>>>>>>>    3. Spinning up of SDKHarness is runner's responsibility and
>>>>>>>>>>    it can be done statically (a fixed pre configured number of 
>>>>>>>>>> SDKHarness) or
>>>>>>>>>>    dynamically or based on certain other configuration/logic which 
>>>>>>>>>> runner
>>>>>>>>>>    choose.
>>>>>>>>>>
>>>>>>>>>> SDKHarness Expectation. This is more in question and we should
>>>>>>>>>> outline the responsibility of SDKHarness.
>>>>>>>>>>
>>>>>>>>>>    1. SDKHarness should publish how many concurrent tasks it can
>>>>>>>>>>    execute.
>>>>>>>>>>    2. SDKHarness should start executing all the tasks items
>>>>>>>>>>    assigned in parallel in a timely manner or fail task.
>>>>>>>>>>
>>>>>>>>>> Also to add to simplification side. I think for better adoption,
>>>>>>>>>> we should have simple SDKHarness as well as simple Runner 
>>>>>>>>>> integration to
>>>>>>>>>> encourage integration with more runner. Also many runners might not 
>>>>>>>>>> expose
>>>>>>>>>> some of the internal scheduling characteristics so we should not 
>>>>>>>>>> expect
>>>>>>>>>> scheduling characteristics for runner integration. Moreover 
>>>>>>>>>> scheduling
>>>>>>>>>> characteristics can change based on pipeline type, infrastructure,
>>>>>>>>>> available resource etc. So I am a bit hesitant to add runner 
>>>>>>>>>> scheduling
>>>>>>>>>> specifics for runner integration.
>>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Ankur
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Finding a good balance is indeed the art of portability, because
>>>>>>>>>>> the range of capability (and assumptions) on both sides is wide.
>>>>>>>>>>>
>>>>>>>>>>> It was originally the idea to allow the SDK harness to be an
>>>>>>>>>>> extremely simple bundle executer (specifically, single-threaded 
>>>>>>>>>>> execution
>>>>>>>>>>> one instruction at a time) however inefficient -- a more 
>>>>>>>>>>> sophisticated SDK
>>>>>>>>>>> harness would support more features and be more efficient. For the 
>>>>>>>>>>> issue
>>>>>>>>>>> described here, it seems problematic to me to send non-executable 
>>>>>>>>>>> bundles
>>>>>>>>>>> to the SDK harness under the expectation that the SDK harness will
>>>>>>>>>>> concurrently work its way deeply enough down the instruction queue 
>>>>>>>>>>> to
>>>>>>>>>>> unblock itself. That would be an extremely subtle requirement for 
>>>>>>>>>>> SDK
>>>>>>>>>>> authors and one practical question becomes: what should an SDK
>>>>>>>>>>> do with a bundle instruction that it doesn't have capacity to 
>>>>>>>>>>> execute? If
>>>>>>>>>>> a runner needs to make such assumptions, I think that information 
>>>>>>>>>>> should
>>>>>>>>>>> probably rather be explicit along the lines of proposal 1 -- i.e., 
>>>>>>>>>>> some
>>>>>>>>>>> kind of negotiation between resources allotted to the SDK harness (a
>>>>>>>>>>> preliminary variant are in the provisioning api) and what the SDK 
>>>>>>>>>>> harness
>>>>>>>>>>> in return can do (and a valid answer might be: 1 bundle at a time
>>>>>>>>>>> irrespectively of resources given) or a per-bundle special 
>>>>>>>>>>> "overloaded"
>>>>>>>>>>> error response. For other aspects, such as side input readiness, 
>>>>>>>>>>> the runner
>>>>>>>>>>> handles that complexity and the overall bias has generally been to 
>>>>>>>>>>> move
>>>>>>>>>>> complexity to the runner side.
>>>>>>>>>>>
>>>>>>>>>>> The SDK harness and initialization overhead is entirely SDK, job
>>>>>>>>>>> type and even pipeline specific. A docker container is also just a 
>>>>>>>>>>> process,
>>>>>>>>>>> btw, and doesn't inherently carry much overhead. That said, on a 
>>>>>>>>>>> single
>>>>>>>>>>> host, a static docker configuration is generally a lot simpler to 
>>>>>>>>>>> work with.
>>>>>>>>>>>
>>>>>>>>>>> Henning
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>>
>>>>>>>>>>>> I think there needs to be a good balance between the SDK
>>>>>>>>>>>> harness capabilities/complexity and responsibilities. Additionally 
>>>>>>>>>>>> the user
>>>>>>>>>>>> will need to be able to adjust the runner behavior, since the type 
>>>>>>>>>>>> of
>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere we 
>>>>>>>>>>>> already
>>>>>>>>>>>> discussed that the current assumption of a single SDK harness 
>>>>>>>>>>>> instance per
>>>>>>>>>>>> Flink task manager brings problems with it and that there needs to 
>>>>>>>>>>>> be more
>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses.
>>>>>>>>>>>>
>>>>>>>>>>>> There was the concern that instantiation if multiple SDK
>>>>>>>>>>>> harnesses per TM host is expensive (resource usage, initialization 
>>>>>>>>>>>> time
>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch 
>>>>>>>>>>>> workloads
>>>>>>>>>>>> and the use of Docker containers. But it may look totally 
>>>>>>>>>>>> different for a
>>>>>>>>>>>> streaming topology or when SDK harness is just a process on the 
>>>>>>>>>>>> same host.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> SDK harnesses were always responsible for executing all work
>>>>>>>>>>>>> given to it concurrently. Runners have been responsible for 
>>>>>>>>>>>>> choosing how
>>>>>>>>>>>>> much work to give to the SDK harness in such a way that best 
>>>>>>>>>>>>> utilizes the
>>>>>>>>>>>>> SDK harness.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I understand that multithreading in python is inefficient due
>>>>>>>>>>>>> to the global interpreter lock, it would be upto the runner in 
>>>>>>>>>>>>> this case to
>>>>>>>>>>>>> make sure that the amount of work it gives to each SDK harness 
>>>>>>>>>>>>> best
>>>>>>>>>>>>> utilizes it while spinning up an appropriate number of SDK 
>>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for looking into this problem. The cause seems to be
>>>>>>>>>>>>>> Flink's
>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in one task
>>>>>>>>>>>>>> slot and
>>>>>>>>>>>>>> produces a deadlock when the pipelined operators schedule the
>>>>>>>>>>>>>> SDK
>>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The problem would be resolved if we scheduled the tasks in
>>>>>>>>>>>>>> topological
>>>>>>>>>>>>>> order. Doing that is not easy because they run in separate
>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>> operators and the SDK Harness would have to have insight into
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to ensure
>>>>>>>>>>>>>> that the
>>>>>>>>>>>>>> number of threads in the SDK harness matches the number of
>>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It glues
>>>>>>>>>>>>>> together
>>>>>>>>>>>>>> horizontal parts of the execution graph, also in multiple
>>>>>>>>>>>>>> threads. So I
>>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by limited task
>>>>>>>>>>>>>> parallelism on
>>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job on Flink
>>>>>>>>>>>>>> standalone
>>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>>> >   * Everything is running on a single machine with single
>>>>>>>>>>>>>> Flink task
>>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>>> > With multi stage job, Flink schedule different dependent
>>>>>>>>>>>>>> sub tasks
>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can get slots.
>>>>>>>>>>>>>> Each map tasks
>>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before MapA and
>>>>>>>>>>>>>> hence gets
>>>>>>>>>>>>>> > into the execution queue before MapA. Because we only have
>>>>>>>>>>>>>> 1 execution
>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a chance to
>>>>>>>>>>>>>> execute as MapB
>>>>>>>>>>>>>> > will never release the execution thread. MapB will wait for
>>>>>>>>>>>>>> input from
>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple pipeline.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>>> > Set worker_count in pipeline options more than the expected
>>>>>>>>>>>>>> sub tasks
>>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the runner and
>>>>>>>>>>>>>> make sure
>>>>>>>>>>>>>> >     that we have more threads than max concurrency. This
>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>> >     assumes that Beam has insight into runner execution
>>>>>>>>>>>>>> plan and can
>>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>>> >  2. We dynamically create thread and cache them with a high
>>>>>>>>>>>>>> upper bound
>>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting the upper
>>>>>>>>>>>>>> bound of
>>>>>>>>>>>>>> >     threads. This approach assumes that runner does a good
>>>>>>>>>>>>>> job of
>>>>>>>>>>>>>> >     scheduling and will distribute tasks more or less
>>>>>>>>>>>>>> evenly.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > We expect good scheduling from runners so I prefer approach
>>>>>>>>>>>>>> 2. It is
>>>>>>>>>>>>>> > simpler to implement and the implementation is not runner
>>>>>>>>>>>>>> specific. This
>>>>>>>>>>>>>> > approach better utilize resource as it creates only as many
>>>>>>>>>>>>>> threads as
>>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>>> > And last but not the least, it gives runner control over
>>>>>>>>>>>>>> managing truly
>>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Please let me know if I am missing something and your
>>>>>>>>>>>>>> thoughts on the
>>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to